Merge origin/main into sa-processes

This commit is contained in:
Nathan Flurry 2026-03-06 12:26:13 -08:00
commit 0171e33873
61 changed files with 3140 additions and 840 deletions

View file

@ -93,6 +93,20 @@ fn map_error(err: AdapterError) -> Response {
"timeout",
"timed out waiting for agent response",
),
AdapterError::Exited { exit_code, stderr } => {
let detail = if let Some(stderr) = stderr {
format!(
"agent process exited before responding (exit_code: {:?}, stderr: {})",
exit_code, stderr
)
} else {
format!(
"agent process exited before responding (exit_code: {:?})",
exit_code
)
};
problem(StatusCode::BAD_GATEWAY, "agent_exited", &detail)
}
AdapterError::Write(write) => problem(
StatusCode::BAD_GATEWAY,
"write_failed",

View file

@ -32,6 +32,7 @@ async fn main() {
}
async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let started = std::time::Instant::now();
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
@ -41,6 +42,12 @@ async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.init();
let cli = Cli::parse();
tracing::info!(
host = %cli.host,
port = cli.port,
startup_ms = started.elapsed().as_millis() as u64,
"acp-http-adapter.run: starting server"
);
run_server(ServerConfig {
host: cli.host,
port: cli.port,

View file

@ -16,6 +16,7 @@ use tokio_stream::wrappers::BroadcastStream;
use crate::registry::LaunchSpec;
const RING_BUFFER_SIZE: usize = 1024;
const STDERR_TAIL_SIZE: usize = 16;
#[derive(Debug, Error)]
pub enum AdapterError {
@ -33,6 +34,11 @@ pub enum AdapterError {
Serialize(serde_json::Error),
#[error("failed to write subprocess stdin: {0}")]
Write(std::io::Error),
#[error("agent process exited before responding")]
Exited {
exit_code: Option<i32>,
stderr: Option<String>,
},
#[error("timeout waiting for response")]
Timeout,
}
@ -61,6 +67,7 @@ pub struct AdapterRuntime {
shutting_down: AtomicBool,
spawned_at: Instant,
first_stdout: Arc<AtomicBool>,
stderr_tail: Arc<Mutex<VecDeque<String>>>,
}
impl AdapterRuntime {
@ -120,6 +127,7 @@ impl AdapterRuntime {
shutting_down: AtomicBool::new(false),
spawned_at: spawn_start,
first_stdout: Arc::new(AtomicBool::new(false)),
stderr_tail: Arc::new(Mutex::new(VecDeque::with_capacity(STDERR_TAIL_SIZE))),
};
runtime.spawn_stdout_loop(stdout);
@ -198,6 +206,16 @@ impl AdapterRuntime {
"post: response channel dropped (agent process may have exited)"
);
self.pending.lock().await.remove(&key);
if let Some((exit_code, stderr)) = self.try_process_exit_info().await {
tracing::error!(
method = %method,
id = %key,
exit_code = ?exit_code,
stderr = ?stderr,
"post: agent process exited before response channel completed"
);
return Err(AdapterError::Exited { exit_code, stderr });
}
Err(AdapterError::Timeout)
}
Err(_) => {
@ -213,6 +231,16 @@ impl AdapterRuntime {
"post: TIMEOUT waiting for agent response"
);
self.pending.lock().await.remove(&key);
if let Some((exit_code, stderr)) = self.try_process_exit_info().await {
tracing::error!(
method = %method,
id = %key,
exit_code = ?exit_code,
stderr = ?stderr,
"post: agent process exited before timeout completed"
);
return Err(AdapterError::Exited { exit_code, stderr });
}
Err(AdapterError::Timeout)
}
}
@ -445,6 +473,7 @@ impl AdapterRuntime {
fn spawn_stderr_loop(&self, stderr: tokio::process::ChildStderr) {
let spawned_at = self.spawned_at;
let stderr_tail = self.stderr_tail.clone();
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
@ -452,6 +481,13 @@ impl AdapterRuntime {
while let Ok(Some(line)) = lines.next_line().await {
line_count += 1;
{
let mut tail = stderr_tail.lock().await;
tail.push_back(line.clone());
while tail.len() > STDERR_TAIL_SIZE {
tail.pop_front();
}
}
tracing::info!(
line_number = line_count,
age_ms = spawned_at.elapsed().as_millis() as u64,
@ -560,6 +596,28 @@ impl AdapterRuntime {
tracing::debug!(method = method, id = %id, "stdin: write+flush complete");
Ok(())
}
async fn try_process_exit_info(&self) -> Option<(Option<i32>, Option<String>)> {
let mut child = self.child.lock().await;
match child.try_wait() {
Ok(Some(status)) => {
let exit_code = status.code();
drop(child);
let stderr = self.stderr_tail_summary().await;
Some((exit_code, stderr))
}
Ok(None) => None,
Err(_) => None,
}
}
async fn stderr_tail_summary(&self) -> Option<String> {
let tail = self.stderr_tail.lock().await;
if tail.is_empty() {
return None;
}
Some(tail.iter().cloned().collect::<Vec<_>>().join("\n"))
}
}
fn id_key(value: &Value) -> String {

View file

@ -20,3 +20,4 @@ url.workspace = true
dirs.workspace = true
tempfile.workspace = true
time.workspace = true
tracing.workspace = true

View file

@ -4,6 +4,7 @@ use std::fs;
use std::io::{self, Read};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Instant;
use flate2::read::GzDecoder;
use reqwest::blocking::Client;
@ -78,7 +79,7 @@ impl AgentId {
fn agent_process_registry_id(self) -> Option<&'static str> {
match self {
AgentId::Claude => Some("claude-code-acp"),
AgentId::Claude => Some("claude-acp"),
AgentId::Codex => Some("codex-acp"),
AgentId::Opencode => Some("opencode"),
AgentId::Amp => Some("amp-acp"),
@ -90,7 +91,7 @@ impl AgentId {
fn agent_process_binary_hint(self) -> Option<&'static str> {
match self {
AgentId::Claude => Some("claude-code-acp"),
AgentId::Claude => Some("claude-agent-acp"),
AgentId::Codex => Some("codex-acp"),
AgentId::Opencode => Some("opencode"),
AgentId::Amp => Some("amp-acp"),
@ -321,6 +322,14 @@ impl AgentManager {
agent: AgentId,
options: InstallOptions,
) -> Result<InstallResult, AgentError> {
let install_started = Instant::now();
tracing::info!(
agent = agent.as_str(),
reinstall = options.reinstall,
native_version = ?options.version,
agent_process_version = ?options.agent_process_version,
"agent_manager.install: starting"
);
fs::create_dir_all(&self.install_dir)?;
fs::create_dir_all(self.install_dir.join("agent_processes"))?;
@ -345,10 +354,20 @@ impl AgentManager {
artifacts.push(artifact);
}
Ok(InstallResult {
let result = InstallResult {
artifacts,
already_installed,
})
};
tracing::info!(
agent = agent.as_str(),
already_installed = result.already_installed,
artifact_count = result.artifacts.len(),
total_ms = elapsed_ms(install_started),
"agent_manager.install: completed"
);
Ok(result)
}
pub fn is_installed(&self, agent: AgentId) -> bool {
@ -392,25 +411,41 @@ impl AgentManager {
&self,
agent: AgentId,
) -> Result<AgentProcessLaunchSpec, AgentError> {
let started = Instant::now();
if agent == AgentId::Mock {
return Ok(AgentProcessLaunchSpec {
let spec = AgentProcessLaunchSpec {
program: self.agent_process_path(agent),
args: Vec::new(),
env: HashMap::new(),
source: InstallSource::Builtin,
version: Some("builtin".to_string()),
});
};
tracing::info!(
agent = agent.as_str(),
source = ?spec.source,
total_ms = elapsed_ms(started),
"agent_manager.resolve_agent_process: resolved builtin"
);
return Ok(spec);
}
let launcher = self.agent_process_path(agent);
if launcher.exists() {
return Ok(AgentProcessLaunchSpec {
let spec = AgentProcessLaunchSpec {
program: launcher,
args: Vec::new(),
env: HashMap::new(),
source: InstallSource::LocalPath,
version: None,
});
};
tracing::info!(
agent = agent.as_str(),
source = ?spec.source,
program = %spec.program.display(),
total_ms = elapsed_ms(started),
"agent_manager.resolve_agent_process: resolved local launcher"
);
return Ok(spec);
}
if let Some(bin) = agent.agent_process_binary_hint().and_then(find_in_path) {
@ -419,29 +454,47 @@ impl AgentManager {
} else {
Vec::new()
};
return Ok(AgentProcessLaunchSpec {
let spec = AgentProcessLaunchSpec {
program: bin,
args,
env: HashMap::new(),
source: InstallSource::LocalPath,
version: None,
});
};
tracing::info!(
agent = agent.as_str(),
source = ?spec.source,
program = %spec.program.display(),
args = ?spec.args,
total_ms = elapsed_ms(started),
"agent_manager.resolve_agent_process: resolved PATH binary hint"
);
return Ok(spec);
}
if agent == AgentId::Opencode {
let native = self.resolve_binary(agent)?;
return Ok(AgentProcessLaunchSpec {
let spec = AgentProcessLaunchSpec {
program: native,
args: vec!["acp".to_string()],
env: HashMap::new(),
source: InstallSource::LocalPath,
version: None,
});
};
tracing::info!(
agent = agent.as_str(),
source = ?spec.source,
program = %spec.program.display(),
args = ?spec.args,
total_ms = elapsed_ms(started),
"agent_manager.resolve_agent_process: resolved opencode native"
);
return Ok(spec);
}
Err(AgentError::AgentProcessNotFound {
agent,
hint: Some("run install to provision ACP agent process".to_string()),
hint: Some(format!("run step 3: `sandbox-agent install-agent {agent}`")),
})
}
@ -454,11 +507,23 @@ impl AgentManager {
agent: AgentId,
options: &InstallOptions,
) -> Result<Option<InstalledArtifact>, AgentError> {
let started = Instant::now();
if !options.reinstall && self.native_installed(agent) {
tracing::info!(
agent = agent.as_str(),
total_ms = elapsed_ms(started),
"agent_manager.install_native: already installed"
);
return Ok(None);
}
let path = self.binary_path(agent);
tracing::info!(
agent = agent.as_str(),
path = %path.display(),
version_override = ?options.version,
"agent_manager.install_native: installing"
);
match agent {
AgentId::Claude => install_claude(&path, self.platform, options.version.as_deref())?,
AgentId::Codex => install_codex(&path, self.platform, options.version.as_deref())?,
@ -474,12 +539,22 @@ impl AgentManager {
}
}
Ok(Some(InstalledArtifact {
let artifact = InstalledArtifact {
kind: InstalledArtifactKind::NativeAgent,
path,
version: self.version(agent).ok().flatten(),
source: InstallSource::Fallback,
}))
};
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
version = ?artifact.version,
total_ms = elapsed_ms(started),
"agent_manager.install_native: completed"
);
Ok(Some(artifact))
}
fn install_agent_process(
@ -487,8 +562,14 @@ impl AgentManager {
agent: AgentId,
options: &InstallOptions,
) -> Result<Option<InstalledArtifact>, AgentError> {
let started = Instant::now();
if !options.reinstall {
if self.agent_process_status(agent).is_some() {
tracing::info!(
agent = agent.as_str(),
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process: already installed"
);
return Ok(None);
}
}
@ -496,22 +577,104 @@ impl AgentManager {
if agent == AgentId::Mock {
let path = self.agent_process_path(agent);
write_mock_agent_process_launcher(&path)?;
return Ok(Some(InstalledArtifact {
let artifact = InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path,
version: Some("builtin".to_string()),
source: InstallSource::Builtin,
}));
};
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process: installed builtin launcher"
);
return Ok(Some(artifact));
}
if let Some(artifact) = self.install_agent_process_from_registry(agent, options)? {
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
version = ?artifact.version,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process: installed from registry"
);
return Ok(Some(artifact));
}
let artifact = self.install_agent_process_fallback(agent, options)?;
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
version = ?artifact.version,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process: installed from fallback"
);
Ok(Some(artifact))
}
fn install_npm_agent_process_package(
&self,
agent: AgentId,
package: &str,
args: &[String],
env: &HashMap<String, String>,
source: InstallSource,
version: Option<String>,
) -> Result<InstalledArtifact, AgentError> {
let started = Instant::now();
let root = self.agent_process_storage_dir(agent);
if root.exists() {
fs::remove_dir_all(&root)?;
}
fs::create_dir_all(&root)?;
let npm_install_started = Instant::now();
install_npm_package(&root, package, agent)?;
let npm_install_ms = elapsed_ms(npm_install_started);
let bin_name = agent.agent_process_binary_hint().ok_or_else(|| {
AgentError::ExtractFailed(format!(
"missing executable hint for agent process package: {agent}"
))
})?;
let cmd_path = npm_bin_path(&root, bin_name);
if !cmd_path.exists() {
return Err(AgentError::ExtractFailed(format!(
"installed package missing executable: {}",
cmd_path.display()
)));
}
let launcher = self.agent_process_path(agent);
let write_started = Instant::now();
write_exec_agent_process_launcher(&launcher, &cmd_path, args, env)?;
let write_ms = elapsed_ms(write_started);
let verify_started = Instant::now();
verify_command(&launcher, &[])?;
let verify_ms = elapsed_ms(verify_started);
tracing::info!(
agent = agent.as_str(),
package = %package,
cmd = %cmd_path.display(),
npm_install_ms = npm_install_ms,
write_ms = write_ms,
verify_ms = verify_ms,
total_ms = elapsed_ms(started),
"agent_manager.install_npm_agent_process_package: completed"
);
Ok(InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version,
source,
})
}
fn agent_process_status(&self, agent: AgentId) -> Option<AgentProcessStatus> {
if agent == AgentId::Mock {
return Some(AgentProcessStatus {
@ -540,59 +703,111 @@ impl AgentManager {
agent: AgentId,
options: &InstallOptions,
) -> Result<Option<InstalledArtifact>, AgentError> {
let started = Instant::now();
let Some(registry_id) = agent.agent_process_registry_id() else {
return Ok(None);
};
tracing::info!(
agent = agent.as_str(),
registry_id = registry_id,
url = %self.registry_url,
"agent_manager.install_agent_process_from_registry: fetching registry"
);
let fetch_started = Instant::now();
let registry = fetch_registry(&self.registry_url)?;
tracing::info!(
agent = agent.as_str(),
registry_id = registry_id,
fetch_ms = elapsed_ms(fetch_started),
"agent_manager.install_agent_process_from_registry: registry fetched"
);
let Some(entry) = registry.agents.into_iter().find(|a| a.id == registry_id) else {
tracing::info!(
agent = agent.as_str(),
registry_id = registry_id,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_from_registry: missing entry"
);
return Ok(None);
};
if let Some(npx) = entry.distribution.npx {
let package =
apply_npx_version_override(&npx.package, options.agent_process_version.as_deref());
let launcher = self.agent_process_path(agent);
write_npx_agent_process_launcher(&launcher, &package, &npx.args, &npx.env)?;
verify_command(&launcher, &[])?;
return Ok(Some(InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options
.agent_process_version
.clone()
.or(entry.version)
.or(extract_npx_version(&package)),
source: InstallSource::Registry,
}));
let version = options
.agent_process_version
.clone()
.or(entry.version)
.or(extract_npx_version(&package));
let artifact = self.install_npm_agent_process_package(
agent,
&package,
&npx.args,
&npx.env,
InstallSource::Registry,
version,
)?;
tracing::info!(
agent = agent.as_str(),
package = %package,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_from_registry: npm package installed"
);
return Ok(Some(artifact));
}
if let Some(binary) = entry.distribution.binary {
let key = self.platform.registry_key();
if let Some(target) = binary.get(key) {
let archive_url = Url::parse(&target.archive)?;
let download_started = Instant::now();
let payload = download_bytes(&archive_url)?;
let download_ms = elapsed_ms(download_started);
let root = self.agent_process_storage_dir(agent);
if root.exists() {
fs::remove_dir_all(&root)?;
}
fs::create_dir_all(&root)?;
let unpack_started = Instant::now();
unpack_archive(&payload, &archive_url, &root)?;
let unpack_ms = elapsed_ms(unpack_started);
let cmd_path = resolve_extracted_command(&root, &target.cmd)?;
let launcher = self.agent_process_path(agent);
let write_started = Instant::now();
write_exec_agent_process_launcher(&launcher, &cmd_path, &target.args, &target.env)?;
let write_ms = elapsed_ms(write_started);
let verify_started = Instant::now();
verify_command(&launcher, &[])?;
let verify_ms = elapsed_ms(verify_started);
return Ok(Some(InstalledArtifact {
let artifact = InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options.agent_process_version.clone().or(entry.version),
source: InstallSource::Registry,
}));
};
tracing::info!(
agent = agent.as_str(),
archive_url = %archive_url,
download_ms = download_ms,
unpack_ms = unpack_ms,
write_ms = write_ms,
verify_ms = verify_ms,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_from_registry: binary launcher installed"
);
return Ok(Some(artifact));
}
}
tracing::info!(
agent = agent.as_str(),
registry_id = registry_id,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_from_registry: no compatible distribution"
);
Ok(None)
}
@ -601,24 +816,44 @@ impl AgentManager {
agent: AgentId,
options: &InstallOptions,
) -> Result<InstalledArtifact, AgentError> {
let launcher = self.agent_process_path(agent);
match agent {
let started = Instant::now();
let artifact = match agent {
AgentId::Claude => {
let package = fallback_npx_package(
"@zed-industries/claude-code-acp",
"@zed-industries/claude-agent-acp",
options.agent_process_version.as_deref(),
);
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
}
AgentId::Codex => {
let package = fallback_npx_package(
"@zed-industries/codex-acp",
options.agent_process_version.as_deref(),
);
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
}
AgentId::Opencode => {
let launcher = self.agent_process_path(agent);
let native = self.resolve_binary(agent)?;
write_exec_agent_process_launcher(
&launcher,
@ -626,37 +861,82 @@ impl AgentManager {
&["acp".to_string()],
&HashMap::new(),
)?;
verify_command(&launcher, &[])?;
InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options.agent_process_version.clone(),
source: InstallSource::Fallback,
}
}
AgentId::Amp => {
let package =
fallback_npx_package("amp-acp", options.agent_process_version.as_deref());
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
}
AgentId::Pi => {
let package =
fallback_npx_package("pi-acp", options.agent_process_version.as_deref());
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
}
AgentId::Cursor => {
let package = fallback_npx_package(
"@blowmage/cursor-agent-acp",
options.agent_process_version.as_deref(),
);
write_npx_agent_process_launcher(&launcher, &package, &[], &HashMap::new())?;
self.install_npm_agent_process_package(
agent,
&package,
&[],
&HashMap::new(),
InstallSource::Fallback,
options
.agent_process_version
.clone()
.or(extract_npx_version(&package)),
)?
}
AgentId::Mock => {
let launcher = self.agent_process_path(agent);
write_mock_agent_process_launcher(&launcher)?;
InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options.agent_process_version.clone(),
source: InstallSource::Fallback,
}
}
}
};
verify_command(&launcher, &[])?;
tracing::info!(
agent = agent.as_str(),
source = ?artifact.source,
version = ?artifact.version,
total_ms = elapsed_ms(started),
"agent_manager.install_agent_process_fallback: launcher installed"
);
Ok(InstalledArtifact {
kind: InstalledArtifactKind::AgentProcess,
path: launcher,
version: options.agent_process_version.clone(),
source: InstallSource::Fallback,
})
Ok(artifact)
}
}
@ -732,6 +1012,10 @@ pub enum AgentError {
RegistryParse(String),
#[error("command verification failed: {0}")]
VerifyFailed(String),
#[error(
"npm is required to install {agent}. install npm, then run step 3: `sandbox-agent install-agent {agent}`"
)]
MissingNpm { agent: AgentId },
}
fn fallback_npx_package(base: &str, version: Option<&str>) -> String {
@ -779,15 +1063,36 @@ fn split_package_version(package: &str) -> Option<(&str, &str)> {
}
}
fn write_npx_agent_process_launcher(
path: &Path,
package: &str,
args: &[String],
env: &HashMap<String, String>,
) -> Result<(), AgentError> {
let mut command = vec!["npx".to_string(), "-y".to_string(), package.to_string()];
command.extend(args.iter().cloned());
write_launcher(path, &command, env)
fn install_npm_package(root: &Path, package: &str, agent: AgentId) -> Result<(), AgentError> {
let mut command = Command::new("npm");
command
.arg("install")
.arg("--no-audit")
.arg("--no-fund")
.arg("--prefix")
.arg(root)
.arg(package)
.stdout(Stdio::null())
.stderr(Stdio::null());
match command.status() {
Ok(status) if status.success() => Ok(()),
Ok(status) => Err(AgentError::VerifyFailed(format!(
"npm install failed for {agent} with status {status}. run step 3: `sandbox-agent install-agent {agent}`"
))),
Err(err) if err.kind() == io::ErrorKind::NotFound => Err(AgentError::MissingNpm { agent }),
Err(err) => Err(AgentError::VerifyFailed(format!(
"failed to execute npm for {agent}: {err}"
))),
}
}
fn npm_bin_path(root: &Path, bin_name: &str) -> PathBuf {
let mut path = root.join("node_modules").join(".bin").join(bin_name);
if cfg!(windows) {
path.set_extension("cmd");
}
path
}
fn write_exec_agent_process_launcher(
@ -998,6 +1303,15 @@ fn install_claude(
platform: Platform,
version: Option<&str>,
) -> Result<(), AgentError> {
let started = Instant::now();
tracing::info!(
path = %path.display(),
platform = ?platform,
version_override = ?version,
"agent_manager.install_claude: starting"
);
let version_started = Instant::now();
let version = match version {
Some(version) => version.to_string(),
None => {
@ -1009,6 +1323,7 @@ fn install_claude(
text.trim().to_string()
}
};
let version_ms = elapsed_ms(version_started);
let platform_segment = match platform {
Platform::LinuxX64 => "linux-x64",
@ -1023,12 +1338,26 @@ fn install_claude(
let url = Url::parse(&format!(
"https://storage.googleapis.com/claude-code-dist-86c565f3-f756-42ad-8dfa-d59b1c096819/claude-code-releases/{version}/{platform_segment}/claude"
))?;
let download_started = Instant::now();
let bytes = download_bytes(&url)?;
let download_ms = elapsed_ms(download_started);
let write_started = Instant::now();
write_executable(path, &bytes)?;
tracing::info!(
version = %version,
url = %url,
bytes = bytes.len(),
version_ms = version_ms,
download_ms = download_ms,
write_ms = elapsed_ms(write_started),
total_ms = elapsed_ms(started),
"agent_manager.install_claude: completed"
);
Ok(())
}
fn install_amp(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> {
let started = Instant::now();
let version = match version {
Some(version) => version.to_string(),
None => {
@ -1053,12 +1382,25 @@ fn install_amp(path: &Path, platform: Platform, version: Option<&str>) -> Result
let url = Url::parse(&format!(
"https://storage.googleapis.com/amp-public-assets-prod-0/cli/{version}/amp-{platform_segment}"
))?;
let download_started = Instant::now();
let bytes = download_bytes(&url)?;
let download_ms = elapsed_ms(download_started);
let write_started = Instant::now();
write_executable(path, &bytes)?;
tracing::info!(
version = %version,
url = %url,
bytes = bytes.len(),
download_ms = download_ms,
write_ms = elapsed_ms(write_started),
total_ms = elapsed_ms(started),
"agent_manager.install_amp: completed"
);
Ok(())
}
fn install_codex(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> {
let started = Instant::now();
let target = match platform {
Platform::LinuxX64 | Platform::LinuxX64Musl => "x86_64-unknown-linux-musl",
Platform::LinuxArm64 => "aarch64-unknown-linux-musl",
@ -1077,11 +1419,15 @@ fn install_codex(path: &Path, platform: Platform, version: Option<&str>) -> Resu
))?,
};
let download_started = Instant::now();
let bytes = download_bytes(&url)?;
let download_ms = elapsed_ms(download_started);
let temp_dir = tempfile::tempdir()?;
let unpack_started = Instant::now();
let cursor = io::Cursor::new(bytes);
let mut archive = tar::Archive::new(GzDecoder::new(cursor));
archive.unpack(temp_dir.path())?;
let unpack_ms = elapsed_ms(unpack_started);
let expected = if cfg!(windows) {
format!("codex-{target}.exe")
@ -1091,7 +1437,17 @@ fn install_codex(path: &Path, platform: Platform, version: Option<&str>) -> Resu
let binary = find_file_recursive(temp_dir.path(), &expected)?
.ok_or_else(|| AgentError::ExtractFailed(format!("missing {expected}")))?;
let move_started = Instant::now();
move_executable(&binary, path)?;
tracing::info!(
url = %url,
target = target,
download_ms = download_ms,
unpack_ms = unpack_ms,
move_ms = elapsed_ms(move_started),
total_ms = elapsed_ms(started),
"agent_manager.install_codex: completed"
);
Ok(())
}
@ -1100,7 +1456,15 @@ fn install_opencode(
platform: Platform,
version: Option<&str>,
) -> Result<(), AgentError> {
match platform {
let started = Instant::now();
tracing::info!(
path = %path.display(),
platform = ?platform,
version_override = ?version,
"agent_manager.install_opencode: starting"
);
let result = match platform {
Platform::MacosArm64 => {
let url = match version {
Some(version) => Url::parse(&format!(
@ -1141,22 +1505,46 @@ fn install_opencode(
))?,
};
let download_started = Instant::now();
let bytes = download_bytes(&url)?;
let download_ms = elapsed_ms(download_started);
let temp_dir = tempfile::tempdir()?;
let unpack_started = Instant::now();
let cursor = io::Cursor::new(bytes);
let mut archive = tar::Archive::new(GzDecoder::new(cursor));
archive.unpack(temp_dir.path())?;
let unpack_ms = elapsed_ms(unpack_started);
let binary = find_file_recursive(temp_dir.path(), "opencode")
.or_else(|_| find_file_recursive(temp_dir.path(), "opencode.exe"))?
.ok_or_else(|| AgentError::ExtractFailed("missing opencode".to_string()))?;
let move_started = Instant::now();
move_executable(&binary, path)?;
tracing::info!(
url = %url,
download_ms = download_ms,
unpack_ms = unpack_ms,
move_ms = elapsed_ms(move_started),
"agent_manager.install_opencode: tarball extraction complete"
);
Ok(())
}
};
if result.is_ok() {
tracing::info!(
total_ms = elapsed_ms(started),
"agent_manager.install_opencode: completed"
);
}
result
}
fn install_zip_binary(path: &Path, url: &Url, binary_name: &str) -> Result<(), AgentError> {
let started = Instant::now();
let download_started = Instant::now();
let bytes = download_bytes(url)?;
let download_ms = elapsed_ms(download_started);
let reader = io::Cursor::new(bytes);
let mut archive =
zip::ZipArchive::new(reader).map_err(|err| AgentError::ExtractFailed(err.to_string()))?;
@ -1173,7 +1561,16 @@ fn install_zip_binary(path: &Path, url: &Url, binary_name: &str) -> Result<(), A
let out_path = temp_dir.path().join(binary_name);
let mut out_file = fs::File::create(&out_path)?;
io::copy(&mut file, &mut out_file)?;
let move_started = Instant::now();
move_executable(&out_path, path)?;
tracing::info!(
url = %url,
binary_name = binary_name,
download_ms = download_ms,
move_ms = elapsed_ms(move_started),
total_ms = elapsed_ms(started),
"agent_manager.install_zip_binary: completed"
);
return Ok(());
}
Err(AgentError::ExtractFailed(format!("missing {binary_name}")))
@ -1231,6 +1628,10 @@ fn find_file_recursive(dir: &Path, filename: &str) -> Result<Option<PathBuf>, Ag
Ok(None)
}
fn elapsed_ms(start: Instant) -> u64 {
start.elapsed().as_millis() as u64
}
fn parse_version_output(output: &std::process::Output) -> Option<String> {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
@ -1265,6 +1666,38 @@ mod tests {
}
}
fn write_fake_npm(path: &Path) {
write_exec(
path,
r#"#!/usr/bin/env sh
set -e
prefix=""
while [ "$#" -gt 0 ]; do
case "$1" in
install|--no-audit|--no-fund)
shift
;;
--prefix)
prefix="$2"
shift 2
;;
*)
shift
;;
esac
done
[ -n "$prefix" ] || exit 1
mkdir -p "$prefix/node_modules/.bin"
for bin in claude-code-acp codex-acp amp-acp pi-acp cursor-agent-acp; do
echo '#!/usr/bin/env sh' > "$prefix/node_modules/.bin/$bin"
echo 'exit 0' >> "$prefix/node_modules/.bin/$bin"
chmod +x "$prefix/node_modules/.bin/$bin"
done
exit 0
"#,
);
}
fn env_lock() -> &'static Mutex<()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
@ -1407,7 +1840,7 @@ mod tests {
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
write_exec(&bin_dir.join("npx"), "#!/usr/bin/env sh\nexit 0\n");
write_fake_npm(&bin_dir.join("npm"));
let original_path = std::env::var_os("PATH").unwrap_or_default();
let mut paths = vec![bin_dir.clone()];
@ -1455,8 +1888,8 @@ mod tests {
let launcher =
fs::read_to_string(manager.agent_process_path(AgentId::Codex)).expect("launcher");
assert!(
launcher.contains("@example/codex-acp@9.9.9"),
"launcher should include overridden package version"
launcher.contains("node_modules/.bin/codex-acp"),
"launcher should invoke installed codex executable"
);
}
@ -1474,7 +1907,7 @@ mod tests {
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
write_exec(&bin_dir.join("npx"), "#!/usr/bin/env sh\nexit 0\n");
write_fake_npm(&bin_dir.join("npm"));
let original_path = std::env::var_os("PATH").unwrap_or_default();
let mut paths = vec![bin_dir.clone()];
@ -1496,6 +1929,39 @@ mod tests {
assert_eq!(agent_process_artifact.source, InstallSource::Fallback);
}
#[test]
fn install_returns_missing_npm_error_for_npm_backed_agents() {
let _env_lock = env_lock().lock().expect("env lock");
let temp_dir = tempfile::tempdir().expect("create tempdir");
let mut manager = AgentManager::with_platform(temp_dir.path(), Platform::LinuxX64);
write_exec(
&manager.binary_path(AgentId::Codex),
"#!/usr/bin/env sh\nexit 0\n",
);
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
let original_path = std::env::var_os("PATH").unwrap_or_default();
let combined_path = std::env::join_paths([bin_dir]).expect("join PATH");
let _path_guard = EnvVarGuard::set("PATH", &combined_path);
manager.registry_url = serve_registry_once(serde_json::json!({ "agents": [] }));
let error = manager
.install(AgentId::Codex, InstallOptions::default())
.expect_err("install should fail without npm");
match error {
AgentError::MissingNpm { agent } => assert_eq!(agent, AgentId::Codex),
other => panic!("expected MissingNpm, got {other:?}"),
}
drop(original_path);
}
#[test]
fn reinstall_mock_returns_agent_process_artifact() {
let temp_dir = tempfile::tempdir().expect("create tempdir");
@ -1522,7 +1988,7 @@ mod tests {
}
#[test]
fn install_pi_skips_native_and_writes_fallback_npx_launcher() {
fn install_pi_skips_native_and_installs_fallback_npm_launcher() {
let _env_lock = env_lock().lock().expect("env lock");
let temp_dir = tempfile::tempdir().expect("create tempdir");
@ -1530,7 +1996,7 @@ mod tests {
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
write_exec(&bin_dir.join("npx"), "#!/usr/bin/env sh\nexit 0\n");
write_fake_npm(&bin_dir.join("npm"));
let original_path = std::env::var_os("PATH").unwrap_or_default();
let mut paths = vec![bin_dir.clone()];
@ -1564,8 +2030,8 @@ mod tests {
let launcher =
fs::read_to_string(manager.agent_process_path(AgentId::Pi)).expect("read pi launcher");
assert!(
launcher.contains("pi-acp"),
"pi launcher should reference pi-acp package"
launcher.contains("node_modules/.bin/pi-acp"),
"pi launcher should use installed pi executable"
);
// resolve_agent_process should now find it.
@ -1590,7 +2056,7 @@ mod tests {
}
#[test]
fn install_cursor_skips_native_and_writes_fallback_npx_launcher() {
fn install_cursor_skips_native_and_installs_fallback_npm_launcher() {
let _env_lock = env_lock().lock().expect("env lock");
let temp_dir = tempfile::tempdir().expect("create tempdir");
@ -1598,7 +2064,7 @@ mod tests {
let bin_dir = temp_dir.path().join("bin");
fs::create_dir_all(&bin_dir).expect("create bin dir");
write_exec(&bin_dir.join("npx"), "#!/usr/bin/env sh\nexit 0\n");
write_fake_npm(&bin_dir.join("npm"));
let original_path = std::env::var_os("PATH").unwrap_or_default();
let mut paths = vec![bin_dir.clone()];
@ -1630,8 +2096,8 @@ mod tests {
let launcher = fs::read_to_string(manager.agent_process_path(AgentId::Cursor))
.expect("read cursor launcher");
assert!(
launcher.contains("@blowmage/cursor-agent-acp"),
"cursor launcher should reference @blowmage/cursor-agent-acp package"
launcher.contains("node_modules/.bin/cursor-agent-acp"),
"cursor launcher should use installed cursor executable"
);
let spec = manager

View file

@ -165,7 +165,7 @@ impl AcpProxyRuntime {
error = %err,
"acp_proxy: POST → error"
);
Err(map_adapter_error(err))
Err(map_adapter_error(err, Some(instance.agent)))
}
}
}
@ -277,27 +277,28 @@ impl AcpProxyRuntime {
server_id: &str,
agent: AgentId,
) -> Result<Arc<ProxyInstance>, SandboxError> {
let start = std::time::Instant::now();
let total_started = std::time::Instant::now();
tracing::info!(
server_id = server_id,
agent = agent.as_str(),
"create_instance: starting"
);
let install_started = std::time::Instant::now();
self.ensure_installed(agent).await?;
let install_elapsed = start.elapsed();
tracing::info!(
server_id = server_id,
agent = agent.as_str(),
install_ms = install_elapsed.as_millis() as u64,
install_ms = install_started.elapsed().as_millis() as u64,
"create_instance: agent installed/verified"
);
let resolve_started = std::time::Instant::now();
let manager = self.inner.agent_manager.clone();
let launch = tokio::task::spawn_blocking(move || manager.resolve_agent_process(agent))
.await
.map_err(|err| SandboxError::StreamError {
message: format!("failed to resolve ACP agent process launch spec: {err}"),
message: format!("failed to resolve agent process launch spec: {err}"),
})?
.map_err(|err| SandboxError::StreamError {
message: err.to_string(),
@ -308,10 +309,11 @@ impl AcpProxyRuntime {
agent = agent.as_str(),
program = ?launch.program,
args = ?launch.args,
resolve_ms = start.elapsed().as_millis() as u64,
resolve_ms = resolve_started.elapsed().as_millis() as u64,
"create_instance: launch spec resolved, spawning"
);
let spawn_started = std::time::Instant::now();
let runtime = AdapterRuntime::start(
LaunchSpec {
program: launch.program,
@ -321,12 +323,13 @@ impl AcpProxyRuntime {
self.inner.request_timeout,
)
.await
.map_err(map_adapter_error)?;
.map_err(|err| map_adapter_error(err, Some(agent)))?;
let total_ms = start.elapsed().as_millis() as u64;
let total_ms = total_started.elapsed().as_millis() as u64;
tracing::info!(
server_id = server_id,
agent = agent.as_str(),
spawn_ms = spawn_started.elapsed().as_millis() as u64,
total_ms = total_ms,
"create_instance: ready"
);
@ -340,16 +343,27 @@ impl AcpProxyRuntime {
}
async fn ensure_installed(&self, agent: AgentId) -> Result<(), SandboxError> {
let started = std::time::Instant::now();
if self.inner.require_preinstall {
if !self.is_ready(agent).await {
return Err(SandboxError::AgentNotInstalled {
agent: agent.as_str().to_string(),
});
}
tracing::info!(
agent = agent.as_str(),
total_ms = started.elapsed().as_millis() as u64,
"ensure_installed: preinstall requirement satisfied"
);
return Ok(());
}
if self.is_ready(agent).await {
tracing::info!(
agent = agent.as_str(),
total_ms = started.elapsed().as_millis() as u64,
"ensure_installed: already ready"
);
return Ok(());
}
@ -363,9 +377,19 @@ impl AcpProxyRuntime {
let _guard = lock.lock().await;
if self.is_ready(agent).await {
tracing::info!(
agent = agent.as_str(),
total_ms = started.elapsed().as_millis() as u64,
"ensure_installed: became ready while waiting for lock"
);
return Ok(());
}
tracing::info!(
agent = agent.as_str(),
"ensure_installed: installing missing artifacts"
);
let install_started = std::time::Instant::now();
let manager = self.inner.agent_manager.clone();
tokio::task::spawn_blocking(move || manager.install(agent, InstallOptions::default()))
.await
@ -378,6 +402,12 @@ impl AcpProxyRuntime {
stderr: Some(err.to_string()),
})?;
tracing::info!(
agent = agent.as_str(),
install_ms = install_started.elapsed().as_millis() as u64,
total_ms = started.elapsed().as_millis() as u64,
"ensure_installed: install complete"
);
Ok(())
}
@ -432,7 +462,7 @@ impl AcpDispatch for AcpProxyRuntime {
}
}
fn map_adapter_error(err: AdapterError) -> SandboxError {
fn map_adapter_error(err: AdapterError, agent: Option<AgentId>) -> SandboxError {
match err {
AdapterError::InvalidEnvelope => SandboxError::InvalidRequest {
message: "request body must be a JSON-RPC object".to_string(),
@ -446,6 +476,29 @@ fn map_adapter_error(err: AdapterError) -> SandboxError {
AdapterError::Write(error) => SandboxError::StreamError {
message: format!("failed writing to agent stdin: {error}"),
},
AdapterError::Exited { exit_code, stderr } => {
if let Some(agent) = agent {
SandboxError::AgentProcessExited {
agent: agent.as_str().to_string(),
exit_code,
stderr,
}
} else {
SandboxError::StreamError {
message: if let Some(stderr) = stderr {
format!(
"agent process exited before responding (exit_code: {:?}, stderr: {})",
exit_code, stderr
)
} else {
format!(
"agent process exited before responding (exit_code: {:?})",
exit_code
)
},
}
}
}
AdapterError::Spawn(error) => SandboxError::StreamError {
message: format!("failed to start agent process: {error}"),
},

View file

@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::io::Write;
use std::path::PathBuf;
use std::process::Command as ProcessCommand;
@ -24,7 +24,7 @@ use sandbox_agent_agent_credentials::{
ProviderCredentials,
};
use sandbox_agent_agent_management::agents::{AgentId, AgentManager, InstallOptions};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use thiserror::Error;
use tower_http::cors::{Any, CorsLayer};
@ -220,6 +220,8 @@ pub struct AgentsArgs {
pub enum AgentsCommand {
/// List all agents and install status.
List(ClientArgs),
/// Emit JSON report of model/mode/thought options for all agents.
Report(ClientArgs),
/// Install or reinstall an agent.
Install(ApiInstallAgentArgs),
}
@ -475,6 +477,7 @@ fn run_agents(command: &AgentsCommand, cli: &CliConfig) -> Result<(), CliError>
let result = call_acp_extension(&ctx, ACP_EXTENSION_AGENT_LIST_METHOD, json!({}))?;
write_stdout_line(&serde_json::to_string_pretty(&result)?)
}
AgentsCommand::Report(args) => run_agents_report(args, cli),
AgentsCommand::Install(args) => {
let ctx = ClientContext::new(cli, &args.client)?;
let mut params = serde_json::Map::new();
@ -498,6 +501,223 @@ fn run_agents(command: &AgentsCommand, cli: &CliConfig) -> Result<(), CliError>
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct AgentListApiResponse {
agents: Vec<AgentListApiAgent>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct AgentListApiAgent {
id: String,
installed: bool,
#[serde(default)]
config_error: Option<String>,
#[serde(default)]
config_options: Option<Vec<Value>>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawConfigOption {
#[serde(default)]
id: Option<String>,
#[serde(default)]
category: Option<String>,
#[serde(default)]
current_value: Option<Value>,
#[serde(default)]
options: Vec<RawConfigOptionChoice>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RawConfigOptionChoice {
#[serde(default)]
value: Value,
#[serde(default)]
name: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct AgentConfigReport {
generated_at_ms: u128,
endpoint: String,
agents: Vec<AgentConfigReportEntry>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct AgentConfigReportEntry {
id: String,
installed: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
config_error: Option<String>,
models: AgentConfigCategoryReport,
modes: AgentConfigCategoryReport,
thought_levels: AgentConfigCategoryReport,
}
#[derive(Debug, Serialize, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
struct AgentConfigCategoryReport {
#[serde(default, skip_serializing_if = "Option::is_none")]
current_value: Option<String>,
values: Vec<AgentConfigValueReport>,
}
#[derive(Debug, Serialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
struct AgentConfigValueReport {
value: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
name: Option<String>,
}
#[derive(Clone, Copy)]
enum ConfigReportCategory {
Model,
Mode,
ThoughtLevel,
}
#[derive(Default)]
struct CategoryAccumulator {
current_value: Option<String>,
values: BTreeMap<String, Option<String>>,
}
impl CategoryAccumulator {
fn absorb(&mut self, option: &RawConfigOption) {
if self.current_value.is_none() {
self.current_value = config_value_to_string(option.current_value.as_ref());
}
for candidate in &option.options {
let Some(value) = config_value_to_string(Some(&candidate.value)) else {
continue;
};
let name = candidate
.name
.as_ref()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty());
let entry = self.values.entry(value).or_insert(None);
if entry.is_none() && name.is_some() {
*entry = name;
}
}
}
fn into_report(mut self) -> AgentConfigCategoryReport {
if let Some(current) = self.current_value.clone() {
self.values.entry(current).or_insert(None);
}
AgentConfigCategoryReport {
current_value: self.current_value,
values: self
.values
.into_iter()
.map(|(value, name)| AgentConfigValueReport { value, name })
.collect(),
}
}
}
fn run_agents_report(args: &ClientArgs, cli: &CliConfig) -> Result<(), CliError> {
let ctx = ClientContext::new(cli, args)?;
let response = ctx.get(&format!("{API_PREFIX}/agents?config=true"))?;
let status = response.status();
let text = response.text()?;
if !status.is_success() {
print_error_body(&text)?;
return Err(CliError::HttpStatus(status));
}
let parsed: AgentListApiResponse = serde_json::from_str(&text)?;
let report = build_agent_config_report(parsed, &ctx.endpoint);
write_stdout_line(&serde_json::to_string_pretty(&report)?)
}
fn build_agent_config_report(input: AgentListApiResponse, endpoint: &str) -> AgentConfigReport {
let generated_at_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or(0);
let agents = input
.agents
.into_iter()
.map(|agent| {
let mut model = CategoryAccumulator::default();
let mut mode = CategoryAccumulator::default();
let mut thought_level = CategoryAccumulator::default();
for option_value in agent.config_options.unwrap_or_default() {
let Ok(option) = serde_json::from_value::<RawConfigOption>(option_value) else {
continue;
};
let Some(category) = option
.category
.as_deref()
.or(option.id.as_deref())
.and_then(classify_report_category)
else {
continue;
};
match category {
ConfigReportCategory::Model => model.absorb(&option),
ConfigReportCategory::Mode => mode.absorb(&option),
ConfigReportCategory::ThoughtLevel => thought_level.absorb(&option),
}
}
AgentConfigReportEntry {
id: agent.id,
installed: agent.installed,
config_error: agent.config_error,
models: model.into_report(),
modes: mode.into_report(),
thought_levels: thought_level.into_report(),
}
})
.collect();
AgentConfigReport {
generated_at_ms,
endpoint: endpoint.to_string(),
agents,
}
}
fn classify_report_category(raw: &str) -> Option<ConfigReportCategory> {
let normalized = raw
.trim()
.to_ascii_lowercase()
.replace('-', "_")
.replace(' ', "_");
match normalized.as_str() {
"model" | "model_id" => Some(ConfigReportCategory::Model),
"mode" | "agent_mode" => Some(ConfigReportCategory::Mode),
"thought" | "thoughtlevel" | "thought_level" | "thinking" | "thinking_level"
| "reasoning" | "reasoning_effort" => Some(ConfigReportCategory::ThoughtLevel),
_ => None,
}
}
fn config_value_to_string(value: Option<&Value>) -> Option<String> {
match value {
Some(Value::String(value)) => Some(value.clone()),
Some(Value::Null) | None => None,
Some(other) => Some(other.to_string()),
}
}
fn call_acp_extension(ctx: &ClientContext, method: &str, params: Value) -> Result<Value, CliError> {
let server_id = unique_cli_server_id("cli-ext");
let initialize_path = build_acp_server_path(&server_id, Some("mock"))?;
@ -1219,4 +1439,96 @@ mod tests {
.expect("build request");
assert!(request.headers().get("last-event-id").is_none());
}
#[test]
fn classify_report_category_supports_common_aliases() {
assert!(matches!(
classify_report_category("model"),
Some(ConfigReportCategory::Model)
));
assert!(matches!(
classify_report_category("mode"),
Some(ConfigReportCategory::Mode)
));
assert!(matches!(
classify_report_category("thought_level"),
Some(ConfigReportCategory::ThoughtLevel)
));
assert!(matches!(
classify_report_category("reasoning_effort"),
Some(ConfigReportCategory::ThoughtLevel)
));
assert!(classify_report_category("arbitrary").is_none());
}
#[test]
fn build_agent_config_report_extracts_model_mode_and_thought() {
let response = AgentListApiResponse {
agents: vec![AgentListApiAgent {
id: "codex".to_string(),
installed: true,
config_error: None,
config_options: Some(vec![
json!({
"id": "model",
"category": "model",
"currentValue": "gpt-5",
"options": [
{"value": "gpt-5", "name": "GPT-5"},
{"value": "gpt-5-mini", "name": "GPT-5 mini"}
]
}),
json!({
"id": "mode",
"category": "mode",
"currentValue": "default",
"options": [
{"value": "default", "name": "Default"},
{"value": "plan", "name": "Plan"}
]
}),
json!({
"id": "thought",
"category": "thought_level",
"currentValue": "medium",
"options": [
{"value": "low", "name": "Low"},
{"value": "medium", "name": "Medium"},
{"value": "high", "name": "High"}
]
}),
]),
}],
};
let report = build_agent_config_report(response, "http://127.0.0.1:2468");
let agent = report.agents.first().expect("agent report");
assert_eq!(agent.id, "codex");
assert_eq!(agent.models.current_value.as_deref(), Some("gpt-5"));
assert_eq!(agent.modes.current_value.as_deref(), Some("default"));
assert_eq!(
agent.thought_levels.current_value.as_deref(),
Some("medium")
);
let model_values: Vec<&str> = agent
.models
.values
.iter()
.map(|item| item.value.as_str())
.collect();
assert!(model_values.contains(&"gpt-5"));
assert!(model_values.contains(&"gpt-5-mini"));
let thought_values: Vec<&str> = agent
.thought_levels
.values
.iter()
.map(|item| item.value.as_str())
.collect();
assert!(thought_values.contains(&"low"));
assert!(thought_values.contains(&"medium"));
assert!(thought_values.contains(&"high"));
}
}

View file

@ -8,7 +8,7 @@ use base64::Engine;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, Command};
use tokio::sync::{broadcast, Mutex, RwLock, Semaphore};
use tokio::sync::{broadcast, Mutex, RwLock};
use sandbox_agent_error::SandboxError;
@ -119,7 +119,6 @@ pub struct ProcessRuntime {
struct ProcessRuntimeInner {
next_id: AtomicU64,
processes: RwLock<HashMap<String, Arc<ManagedProcess>>>,
run_once_semaphore: Semaphore,
}
#[derive(Debug)]
@ -183,9 +182,6 @@ impl ProcessRuntime {
inner: Arc::new(ProcessRuntimeInner {
next_id: AtomicU64::new(1),
processes: RwLock::new(HashMap::new()),
run_once_semaphore: Semaphore::new(
ProcessRuntimeConfig::default().max_concurrent_processes,
),
}),
}
}
@ -328,14 +324,6 @@ impl ProcessRuntime {
});
}
let _permit =
self.inner
.run_once_semaphore
.try_acquire()
.map_err(|_| SandboxError::Conflict {
message: "too many concurrent run_once operations".to_string(),
})?;
let config = self.get_config().await;
let mut timeout_ms = spec.timeout_ms.unwrap_or(config.default_run_timeout_ms);
if timeout_ms == 0 {
@ -358,6 +346,9 @@ impl ProcessRuntime {
cmd.current_dir(cwd);
}
if !spec.env.contains_key("TERM") {
cmd.env("TERM", "xterm-256color");
}
for (key, value) in &spec.env {
cmd.env(key, value);
}
@ -622,10 +613,6 @@ impl ProcessRuntime {
cmd.current_dir(cwd);
}
// Default TERM for TTY processes so tools like tmux, vim, etc. work out of the box.
if !spec.env.contains_key("TERM") {
cmd.env("TERM", "xterm-256color");
}
for (key, value) in &spec.env {
cmd.env(key, value);
}

View file

@ -50,12 +50,6 @@ pub use self::types::*;
const APPLICATION_JSON: &str = "application/json";
const TEXT_EVENT_STREAM: &str = "text/event-stream";
const CHANNEL_K8S_IO_PROTOCOL: &str = "channel.k8s.io";
const CH_STDIN: u8 = 0;
const CH_STDOUT: u8 = 1;
const CH_STATUS: u8 = 3;
const CH_RESIZE: u8 = 4;
const CH_CLOSE: u8 = 255;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum BrandingMode {
@ -202,6 +196,10 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
.route("/processes/:id/kill", post(post_v1_process_kill))
.route("/processes/:id/logs", get(get_v1_process_logs))
.route("/processes/:id/input", post(post_v1_process_input))
.route(
"/processes/:id/terminal/resize",
post(post_v1_process_terminal_resize),
)
.route(
"/processes/:id/terminal/ws",
get(get_v1_process_terminal_ws),
@ -346,6 +344,7 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
delete_v1_process,
get_v1_process_logs,
post_v1_process_input,
post_v1_process_terminal_resize,
get_v1_process_terminal_ws,
get_v1_config_mcp,
put_v1_config_mcp,
@ -395,6 +394,8 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
ProcessInputRequest,
ProcessInputResponse,
ProcessSignalQuery,
ProcessTerminalResizeRequest,
ProcessTerminalResizeResponse,
AcpPostQuery,
AcpServerInfo,
AcpServerListResponse,
@ -1493,15 +1494,12 @@ async fn get_v1_process_logs(
since,
};
if query.follow.unwrap_or(false) {
// Subscribe before reading history to avoid losing entries between the
// two operations. Entries are deduplicated by sequence number below.
let rx = runtime.subscribe_logs(&id).await?;
let entries = runtime.logs(&id, filter).await?;
let response_entries: Vec<ProcessLogEntry> =
entries.iter().cloned().map(map_process_log_line).collect();
let last_replay_seq = response_entries.last().map(|e| e.sequence).unwrap_or(0);
let entries = runtime.logs(&id, filter).await?;
let response_entries: Vec<ProcessLogEntry> =
entries.iter().cloned().map(map_process_log_line).collect();
if query.follow.unwrap_or(false) {
let rx = runtime.subscribe_logs(&id).await?;
let replay_stream = stream::iter(response_entries.into_iter().map(|entry| {
Ok::<axum::response::sse::Event, Infallible>(
axum::response::sse::Event::default()
@ -1517,9 +1515,6 @@ async fn get_v1_process_logs(
async move {
match item {
Ok(line) => {
if line.sequence <= last_replay_seq {
return None;
}
let entry = map_process_log_line(line);
if process_log_matches(&entry, requested_stream_copy) {
Some(Ok(axum::response::sse::Event::default()
@ -1544,10 +1539,6 @@ async fn get_v1_process_logs(
return Ok(response.into_response());
}
let entries = runtime.logs(&id, filter).await?;
let response_entries: Vec<ProcessLogEntry> =
entries.iter().cloned().map(map_process_log_line).collect();
Ok(Json(ProcessLogsResponse {
process_id: id,
stream: requested_stream,
@ -1601,13 +1592,51 @@ async fn post_v1_process_input(
Ok(Json(ProcessInputResponse { bytes_written }))
}
/// Resize a process terminal.
///
/// Sets the PTY window size (columns and rows) for a tty-mode process and
/// sends SIGWINCH so the child process can adapt.
#[utoipa::path(
post,
path = "/v1/processes/{id}/terminal/resize",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
request_body = ProcessTerminalResizeRequest,
responses(
(status = 200, description = "Resize accepted", body = ProcessTerminalResizeResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Not a terminal process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_terminal_resize(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(body): Json<ProcessTerminalResizeRequest>,
) -> Result<Json<ProcessTerminalResizeResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
state
.process_runtime()
.resize_terminal(&id, body.cols, body.rows)
.await?;
Ok(Json(ProcessTerminalResizeResponse {
cols: body.cols,
rows: body.rows,
}))
}
/// Open an interactive WebSocket terminal session.
///
/// Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
/// `access_token` query param for browser-based auth (WebSocket API cannot
/// send custom headers). Uses the `channel.k8s.io` binary subprotocol:
/// channel 0 stdin, channel 1 stdout, channel 3 status JSON, channel 4 resize,
/// and channel 255 close.
/// send custom headers). Streams raw PTY output as binary frames and accepts
/// JSON control frames for input, resize, and close.
#[utoipa::path(
get,
path = "/v1/processes/{id}/terminal/ws",
@ -1643,16 +1672,23 @@ async fn get_v1_process_terminal_ws(
}
Ok(ws
.protocols([CHANNEL_K8S_IO_PROTOCOL])
.on_upgrade(move |socket| process_terminal_ws_session(socket, runtime, id))
.into_response())
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TerminalResizePayload {
cols: u16,
rows: u16,
#[serde(tag = "type", rename_all = "camelCase")]
enum TerminalClientFrame {
Input {
data: String,
#[serde(default)]
encoding: Option<String>,
},
Resize {
cols: u16,
rows: u16,
},
Close,
}
async fn process_terminal_ws_session(
@ -1660,7 +1696,7 @@ async fn process_terminal_ws_session(
runtime: Arc<ProcessRuntime>,
id: String,
) {
let _ = send_status_json(
let _ = send_ws_json(
&mut socket,
json!({
"type": "ready",
@ -1672,8 +1708,7 @@ async fn process_terminal_ws_session(
let mut log_rx = match runtime.subscribe_logs(&id).await {
Ok(rx) => rx,
Err(err) => {
let _ = send_status_error(&mut socket, &err.to_string()).await;
let _ = send_close_signal(&mut socket).await;
let _ = send_ws_error(&mut socket, &err.to_string()).await;
let _ = socket.close().await;
return;
}
@ -1684,57 +1719,43 @@ async fn process_terminal_ws_session(
tokio::select! {
ws_in = socket.recv() => {
match ws_in {
Some(Ok(Message::Binary(bytes))) => {
let Some((&channel, payload)) = bytes.split_first() else {
let _ = send_status_error(&mut socket, "invalid terminal frame: missing channel byte").await;
continue;
};
match channel {
CH_STDIN => {
let input = payload.to_vec();
let max_input = runtime.max_input_bytes().await;
if input.len() > max_input {
let _ = send_status_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await;
continue;
}
if let Err(err) = runtime.write_input(&id, &input).await {
let _ = send_status_error(&mut socket, &err.to_string()).await;
}
}
CH_RESIZE => {
let resize = match serde_json::from_slice::<TerminalResizePayload>(payload) {
Ok(resize) => resize,
Some(Ok(Message::Binary(_))) => {
let _ = send_ws_error(&mut socket, "binary input is not supported; use text JSON frames").await;
}
Some(Ok(Message::Text(text))) => {
let parsed = serde_json::from_str::<TerminalClientFrame>(&text);
match parsed {
Ok(TerminalClientFrame::Input { data, encoding }) => {
let input = match decode_input_bytes(&data, encoding.as_deref().unwrap_or("utf8")) {
Ok(input) => input,
Err(err) => {
let _ = send_status_error(&mut socket, &format!("invalid resize payload: {err}")).await;
let _ = send_ws_error(&mut socket, &err.to_string()).await;
continue;
}
};
if let Err(err) = runtime
.resize_terminal(&id, resize.cols, resize.rows)
.await
{
let _ = send_status_error(&mut socket, &err.to_string()).await;
let max_input = runtime.max_input_bytes().await;
if input.len() > max_input {
let _ = send_ws_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await;
continue;
}
if let Err(err) = runtime.write_input(&id, &input).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
}
}
CH_CLOSE => {
let _ = send_close_signal(&mut socket).await;
Ok(TerminalClientFrame::Resize { cols, rows }) => {
if let Err(err) = runtime.resize_terminal(&id, cols, rows).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
}
}
Ok(TerminalClientFrame::Close) => {
let _ = socket.close().await;
break;
}
_ => {
let _ = send_status_error(&mut socket, &format!("unsupported terminal channel: {channel}")).await;
Err(err) => {
let _ = send_ws_error(&mut socket, &format!("invalid terminal frame: {err}")).await;
}
}
}
Some(Ok(Message::Text(_))) => {
let _ = send_status_error(
&mut socket,
"text frames are not supported; use channel.k8s.io binary frames",
)
.await;
}
Some(Ok(Message::Ping(payload))) => {
let _ = socket.send(Message::Pong(payload)).await;
}
@ -1754,7 +1775,7 @@ async fn process_terminal_ws_session(
use base64::Engine;
BASE64_ENGINE.decode(&line.data).unwrap_or_default()
};
if send_channel_frame(&mut socket, CH_STDOUT, bytes).await.is_err() {
if socket.send(Message::Binary(bytes)).await.is_err() {
break;
}
}
@ -1765,7 +1786,7 @@ async fn process_terminal_ws_session(
_ = exit_poll.tick() => {
if let Ok(snapshot) = runtime.snapshot(&id).await {
if snapshot.status == ProcessStatus::Exited {
let _ = send_status_json(
let _ = send_ws_json(
&mut socket,
json!({
"type": "exit",
@ -1773,7 +1794,6 @@ async fn process_terminal_ws_session(
}),
)
.await;
let _ = send_close_signal(&mut socket).await;
let _ = socket.close().await;
break;
}
@ -1783,30 +1803,17 @@ async fn process_terminal_ws_session(
}
}
async fn send_channel_frame(
socket: &mut WebSocket,
channel: u8,
payload: impl Into<Vec<u8>>,
) -> Result<(), ()> {
let mut frame = vec![channel];
frame.extend(payload.into());
async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
socket
.send(Message::Binary(frame.into()))
.send(Message::Text(
serde_json::to_string(&payload).map_err(|_| ())?,
))
.await
.map_err(|_| ())
}
async fn send_status_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
send_channel_frame(
socket,
CH_STATUS,
serde_json::to_vec(&payload).map_err(|_| ())?,
)
.await
}
async fn send_status_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
send_status_json(
async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
send_ws_json(
socket,
json!({
"type": "error",
@ -1816,10 +1823,6 @@ async fn send_status_error(socket: &mut WebSocket, message: &str) -> Result<(),
.await
}
async fn send_close_signal(socket: &mut WebSocket) -> Result<(), ()> {
send_channel_frame(socket, CH_CLOSE, Vec::<u8>::new()).await
}
#[utoipa::path(
get,
path = "/v1/config/mcp",

View file

@ -71,10 +71,7 @@ fn percent_decode(input: &str) -> String {
let mut i = 0;
while i < bytes.len() {
if bytes[i] == b'%' && i + 2 < bytes.len() {
if let (Some(hi), Some(lo)) = (
hex_nibble(bytes[i + 1]),
hex_nibble(bytes[i + 2]),
) {
if let (Some(hi), Some(lo)) = (hex_nibble(bytes[i + 1]), hex_nibble(bytes[i + 2])) {
output.push((hi << 4) | lo);
i += 3;
continue;
@ -147,6 +144,9 @@ pub(super) fn fallback_config_options(agent: AgentId) -> Vec<Value> {
AgentId::Codex => CODEX.clone(),
AgentId::Opencode => OPENCODE.clone(),
AgentId::Cursor => CURSOR.clone(),
// Amp returns empty configOptions from session/new but exposes modes via
// the `modes` field. The model is hardcoded. Modes discovered from ACP
// session/new response (amp-acp v0.7.0).
AgentId::Amp => vec![
json!({
"id": "model",
@ -163,12 +163,10 @@ pub(super) fn fallback_config_options(agent: AgentId) -> Vec<Value> {
"name": "Mode",
"category": "mode",
"type": "select",
"currentValue": "smart",
"currentValue": "default",
"options": [
{ "value": "smart", "name": "Smart" },
{ "value": "deep", "name": "Deep" },
{ "value": "free", "name": "Free" },
{ "value": "rush", "name": "Rush" }
{ "value": "default", "name": "Default" },
{ "value": "bypass", "name": "Bypass" }
]
}),
],
@ -182,41 +180,76 @@ pub(super) fn fallback_config_options(agent: AgentId) -> Vec<Value> {
{ "value": "default", "name": "Default" }
]
})],
AgentId::Mock => vec![json!({
"id": "model",
"name": "Model",
"category": "model",
"type": "select",
"currentValue": "mock",
"options": [
{ "value": "mock", "name": "Mock" }
]
})],
AgentId::Mock => vec![
json!({
"id": "model",
"name": "Model",
"category": "model",
"type": "select",
"currentValue": "mock",
"options": [
{ "value": "mock", "name": "Mock" },
{ "value": "mock-fast", "name": "Mock Fast" }
]
}),
json!({
"id": "mode",
"name": "Mode",
"category": "mode",
"type": "select",
"currentValue": "normal",
"options": [
{ "value": "normal", "name": "Normal" },
{ "value": "plan", "name": "Plan" }
]
}),
json!({
"id": "thought_level",
"name": "Thought Level",
"category": "thought_level",
"type": "select",
"currentValue": "low",
"options": [
{ "value": "low", "name": "Low" },
{ "value": "medium", "name": "Medium" },
{ "value": "high", "name": "High" }
]
}),
],
}
}
/// Parse an agent config JSON file (from `scripts/agent-configs/resources/`) into
/// ACP `SessionConfigOption` values. The JSON format is:
/// ```json
/// { "defaultModel": "...", "models": [{id, name}], "defaultMode?": "...", "modes?": [{id, name}] }
/// {
/// "defaultModel": "...", "models": [{id, name}],
/// "defaultMode?": "...", "modes?": [{id, name}],
/// "defaultThoughtLevel?": "...", "thoughtLevels?": [{id, name}]
/// }
/// ```
///
/// Note: Claude and Codex don't report configOptions from `session/new`, so these
/// JSON resource files are the source of truth for the capabilities report.
/// Claude modes (plan, default) were discovered via manual ACP probing —
/// `session/set_mode` works but `session/set_config_option` is not implemented.
/// Codex modes/thought levels were discovered from its `session/new` response.
fn parse_agent_config(json_str: &str) -> Vec<Value> {
#[derive(serde::Deserialize)]
struct AgentConfig {
#[serde(rename = "defaultModel")]
default_model: String,
models: Vec<ModelEntry>,
models: Vec<ConfigEntry>,
#[serde(rename = "defaultMode")]
default_mode: Option<String>,
modes: Option<Vec<ModeEntry>>,
modes: Option<Vec<ConfigEntry>>,
#[serde(rename = "defaultThoughtLevel")]
default_thought_level: Option<String>,
#[serde(rename = "thoughtLevels")]
thought_levels: Option<Vec<ConfigEntry>>,
}
#[derive(serde::Deserialize)]
struct ModelEntry {
id: String,
name: String,
}
#[derive(serde::Deserialize)]
struct ModeEntry {
struct ConfigEntry {
id: String,
name: String,
}
@ -242,7 +275,7 @@ fn parse_agent_config(json_str: &str) -> Vec<Value> {
"name": "Mode",
"category": "mode",
"type": "select",
"currentValue": config.default_mode.unwrap_or_else(|| modes[0].id.clone()),
"currentValue": config.default_mode.or_else(|| modes.first().map(|m| m.id.clone())).unwrap_or_default(),
"options": modes.iter().map(|m| json!({
"value": m.id,
"name": m.name,
@ -250,6 +283,20 @@ fn parse_agent_config(json_str: &str) -> Vec<Value> {
}));
}
if let Some(thought_levels) = config.thought_levels {
options.push(json!({
"id": "thought_level",
"name": "Thought Level",
"category": "thought_level",
"type": "select",
"currentValue": config.default_thought_level.or_else(|| thought_levels.first().map(|t| t.id.clone())).unwrap_or_default(),
"options": thought_levels.iter().map(|t| json!({
"value": t.id,
"name": t.name,
})).collect::<Vec<_>>(),
}));
}
options
}

View file

@ -512,6 +512,20 @@ pub struct ProcessSignalQuery {
pub wait_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeRequest {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeResponse {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessWsQuery {

View file

@ -65,8 +65,8 @@ impl LiveServer {
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let task = tokio::spawn(async move {
let server = axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(async {
let server =
axum::serve(listener, app.into_make_service()).with_graceful_shutdown(async {
let _ = shutdown_rx.await;
});

View file

@ -3,17 +3,8 @@ use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine;
use futures::{SinkExt, StreamExt};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::tungstenite::Message;
const CHANNEL_K8S_IO_PROTOCOL: &str = "channel.k8s.io";
const CH_STDIN: u8 = 0;
const CH_STDOUT: u8 = 1;
const CH_STATUS: u8 = 3;
const CH_RESIZE: u8 = 4;
const CH_CLOSE: u8 = 255;
async fn wait_for_exited(test_app: &TestApp, process_id: &str) {
for _ in 0..30 {
let (status, _, body) = send_request(
@ -57,19 +48,6 @@ async fn recv_ws_message(
.expect("websocket frame")
}
fn make_channel_frame(channel: u8, payload: impl AsRef<[u8]>) -> Vec<u8> {
let payload = payload.as_ref();
let mut frame = Vec::with_capacity(payload.len() + 1);
frame.push(channel);
frame.extend_from_slice(payload);
frame
}
fn parse_channel_frame(bytes: &[u8]) -> (u8, &[u8]) {
let (&channel, payload) = bytes.split_first().expect("channel frame");
(channel, payload)
}
#[tokio::test]
async fn v1_processes_config_round_trip() {
let test_app = TestApp::new(AuthConfig::disabled());
@ -544,81 +522,54 @@ async fn v1_process_terminal_ws_e2e_is_deterministic() {
let process_id = create_body["id"].as_str().expect("process id").to_string();
let ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
let mut ws_request = ws_url.into_client_request().expect("ws request");
ws_request.headers_mut().insert(
"Sec-WebSocket-Protocol",
HeaderValue::from_static(CHANNEL_K8S_IO_PROTOCOL),
);
let (mut ws, response) = connect_async(ws_request).await.expect("connect websocket");
assert_eq!(
response
.headers()
.get("Sec-WebSocket-Protocol")
.and_then(|value| value.to_str().ok()),
Some(CHANNEL_K8S_IO_PROTOCOL)
);
let (mut ws, _) = connect_async(&ws_url).await.expect("connect websocket");
let ready = recv_ws_message(&mut ws).await;
let ready_bytes = ready.into_data();
let (ready_channel, ready_payload) = parse_channel_frame(&ready_bytes);
assert_eq!(ready_channel, CH_STATUS);
let ready_payload: Value = serde_json::from_slice(ready_payload).expect("ready json");
let ready_payload: Value =
serde_json::from_str(ready.to_text().expect("ready text frame")).expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
ws.send(Message::Binary(
make_channel_frame(CH_STDIN, b"hello from ws\n").into(),
ws.send(Message::Text(
json!({
"type": "input",
"data": "hello from ws\n"
})
.to_string(),
))
.await
.expect("send input frame");
ws.send(Message::Binary(
make_channel_frame(CH_RESIZE, br#"{"cols":120,"rows":40}"#).into(),
))
.await
.expect("send resize frame");
let mut saw_stdout = false;
let mut saw_binary_output = false;
let mut saw_exit = false;
let mut saw_close = false;
for _ in 0..10 {
let frame = recv_ws_message(&mut ws).await;
match frame {
Message::Binary(bytes) => {
let (channel, payload) = parse_channel_frame(&bytes);
match channel {
CH_STDOUT => {
let text = String::from_utf8_lossy(payload);
if text.contains("got:hello from ws") {
saw_stdout = true;
}
}
CH_STATUS => {
let payload: Value =
serde_json::from_slice(payload).expect("ws status json");
if payload["type"] == "exit" {
saw_exit = true;
} else {
assert_ne!(payload["type"], "error");
}
}
CH_CLOSE => {
assert!(payload.is_empty(), "close channel payload must be empty");
saw_close = true;
break;
}
other => panic!("unexpected websocket channel: {other}"),
let text = String::from_utf8_lossy(&bytes);
if text.contains("got:hello from ws") {
saw_binary_output = true;
}
}
Message::Text(text) => {
let payload: Value = serde_json::from_str(&text).expect("ws json");
if payload["type"] == "exit" {
saw_exit = true;
break;
}
assert_ne!(payload["type"], "error");
}
Message::Close(_) => break,
Message::Ping(_) | Message::Pong(_) => {}
_ => {}
}
}
assert!(saw_stdout, "expected pty stdout over websocket");
assert!(saw_exit, "expected exit status frame over websocket");
assert!(saw_close, "expected close channel frame over websocket");
assert!(
saw_binary_output,
"expected pty binary output over websocket"
);
assert!(saw_exit, "expected exit control frame over websocket");
let _ = ws.close(None).await;
@ -668,38 +619,19 @@ async fn v1_process_terminal_ws_auth_e2e() {
let auth_ws_url = live_server.ws_url(&format!(
"/v1/processes/{process_id}/terminal/ws?access_token={token}"
));
let mut ws_request = auth_ws_url.into_client_request().expect("ws request");
ws_request.headers_mut().insert(
"Sec-WebSocket-Protocol",
HeaderValue::from_static(CHANNEL_K8S_IO_PROTOCOL),
);
let (mut ws, response) = connect_async(ws_request)
let (mut ws, _) = connect_async(&auth_ws_url)
.await
.expect("authenticated websocket handshake");
assert_eq!(
response
.headers()
.get("Sec-WebSocket-Protocol")
.and_then(|value| value.to_str().ok()),
Some(CHANNEL_K8S_IO_PROTOCOL)
);
let ready = recv_ws_message(&mut ws).await;
let ready_bytes = ready.into_data();
let (ready_channel, ready_payload) = parse_channel_frame(&ready_bytes);
assert_eq!(ready_channel, CH_STATUS);
let ready_payload: Value = serde_json::from_slice(ready_payload).expect("ready json");
let ready_payload: Value =
serde_json::from_str(ready.to_text().expect("ready text frame")).expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
let _ = ws
.send(Message::Binary(make_channel_frame(CH_CLOSE, []).into()))
.send(Message::Text(json!({ "type": "close" }).to_string()))
.await;
let close = recv_ws_message(&mut ws).await;
let close_bytes = close.into_data();
let (close_channel, close_payload) = parse_channel_frame(&close_bytes);
assert_eq!(close_channel, CH_CLOSE);
assert!(close_payload.is_empty());
let _ = ws.close(None).await;
let kill_response = http