feat: desktop computer-use APIs with neko-based streaming

Add desktop runtime management (Xvfb, openbox, dbus), screen capture,
mouse/keyboard input, and video streaming via neko binary extracted
from the m1k1o/neko container. Includes Docker test rig, TypeScript SDK
desktop support, and inspector Desktop tab.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-16 17:56:39 -07:00
parent 3895e34bdb
commit 33821d8660
66 changed files with 13190 additions and 1135 deletions

View file

@ -11,6 +11,7 @@ mod build_version {
include!(concat!(env!("OUT_DIR"), "/version.rs"));
}
use crate::desktop_install::{install_desktop, DesktopInstallRequest, DesktopPackageManager};
use crate::router::{
build_router_with_state, shutdown_servers, AppState, AuthConfig, BrandingMode,
};
@ -75,6 +76,8 @@ pub enum Command {
Server(ServerArgs),
/// Call the HTTP API without writing client code.
Api(ApiArgs),
/// Install first-party runtime dependencies.
Install(InstallArgs),
/// EXPERIMENTAL: OpenCode compatibility layer (disabled until ACP Phase 7).
Opencode(OpencodeArgs),
/// Manage the sandbox-agent background daemon.
@ -118,6 +121,12 @@ pub struct ApiArgs {
command: ApiCommand,
}
#[derive(Args, Debug)]
pub struct InstallArgs {
#[command(subcommand)]
command: InstallCommand,
}
#[derive(Args, Debug)]
pub struct OpencodeArgs {
#[arg(long, short = 'H', default_value = DEFAULT_HOST)]
@ -156,6 +165,12 @@ pub struct DaemonArgs {
command: DaemonCommand,
}
#[derive(Subcommand, Debug)]
pub enum InstallCommand {
/// Install desktop runtime dependencies.
Desktop(InstallDesktopArgs),
}
#[derive(Subcommand, Debug)]
pub enum DaemonCommand {
/// Start the daemon in the background.
@ -310,6 +325,18 @@ pub struct InstallAgentArgs {
agent_process_version: Option<String>,
}
#[derive(Args, Debug)]
pub struct InstallDesktopArgs {
#[arg(long, default_value_t = false)]
yes: bool,
#[arg(long, default_value_t = false)]
print_only: bool,
#[arg(long, value_enum)]
package_manager: Option<DesktopPackageManager>,
#[arg(long, default_value_t = false)]
no_fonts: bool,
}
#[derive(Args, Debug)]
pub struct CredentialsExtractArgs {
#[arg(long, short = 'a', value_enum)]
@ -405,6 +432,7 @@ pub fn run_command(command: &Command, cli: &CliConfig) -> Result<(), CliError> {
match command {
Command::Server(args) => run_server(cli, args),
Command::Api(subcommand) => run_api(&subcommand.command, cli),
Command::Install(subcommand) => run_install(&subcommand.command),
Command::Opencode(args) => run_opencode(cli, args),
Command::Daemon(subcommand) => run_daemon(&subcommand.command, cli),
Command::InstallAgent(args) => install_agent_local(args),
@ -413,6 +441,12 @@ pub fn run_command(command: &Command, cli: &CliConfig) -> Result<(), CliError> {
}
}
fn run_install(command: &InstallCommand) -> Result<(), CliError> {
match command {
InstallCommand::Desktop(args) => install_desktop_local(args),
}
}
fn run_server(cli: &CliConfig, server: &ServerArgs) -> Result<(), CliError> {
let auth = if let Some(token) = cli.token.clone() {
AuthConfig::with_token(token)
@ -477,6 +511,17 @@ fn run_api(command: &ApiCommand, cli: &CliConfig) -> Result<(), CliError> {
}
}
fn install_desktop_local(args: &InstallDesktopArgs) -> Result<(), CliError> {
install_desktop(DesktopInstallRequest {
yes: args.yes,
print_only: args.print_only,
package_manager: args.package_manager,
no_fonts: args.no_fonts,
})
.map(|_| ())
.map_err(CliError::Server)
}
fn run_agents(command: &AgentsCommand, cli: &CliConfig) -> Result<(), CliError> {
match command {
AgentsCommand::List(args) => {

View file

@ -0,0 +1,217 @@
use sandbox_agent_error::ProblemDetails;
use serde_json::{json, Map, Value};
use crate::desktop_types::{DesktopErrorInfo, DesktopProcessInfo};
#[derive(Debug, Clone)]
pub struct DesktopProblem {
status: u16,
title: &'static str,
code: &'static str,
message: String,
missing_dependencies: Vec<String>,
install_command: Option<String>,
processes: Vec<DesktopProcessInfo>,
}
impl DesktopProblem {
pub fn unsupported_platform(message: impl Into<String>) -> Self {
Self::new(
501,
"Desktop Unsupported",
"desktop_unsupported_platform",
message,
)
}
pub fn dependencies_missing(
missing_dependencies: Vec<String>,
install_command: Option<String>,
processes: Vec<DesktopProcessInfo>,
) -> Self {
let mut message = if missing_dependencies.is_empty() {
"Desktop dependencies are not installed".to_string()
} else {
format!(
"Desktop dependencies are not installed: {}",
missing_dependencies.join(", ")
)
};
if let Some(command) = install_command.as_ref() {
message.push_str(&format!(
". Run `{command}` to install them, or install the required tools manually."
));
}
Self::new(
503,
"Desktop Dependencies Missing",
"desktop_dependencies_missing",
message,
)
.with_missing_dependencies(missing_dependencies)
.with_install_command(install_command)
.with_processes(processes)
}
pub fn runtime_inactive(message: impl Into<String>) -> Self {
Self::new(
409,
"Desktop Runtime Inactive",
"desktop_runtime_inactive",
message,
)
}
pub fn runtime_starting(message: impl Into<String>) -> Self {
Self::new(
409,
"Desktop Runtime Starting",
"desktop_runtime_starting",
message,
)
}
pub fn runtime_failed(
message: impl Into<String>,
install_command: Option<String>,
processes: Vec<DesktopProcessInfo>,
) -> Self {
Self::new(
503,
"Desktop Runtime Failed",
"desktop_runtime_failed",
message,
)
.with_install_command(install_command)
.with_processes(processes)
}
pub fn invalid_action(message: impl Into<String>) -> Self {
Self::new(
400,
"Desktop Invalid Action",
"desktop_invalid_action",
message,
)
}
pub fn screenshot_failed(
message: impl Into<String>,
processes: Vec<DesktopProcessInfo>,
) -> Self {
Self::new(
502,
"Desktop Screenshot Failed",
"desktop_screenshot_failed",
message,
)
.with_processes(processes)
}
pub fn input_failed(message: impl Into<String>, processes: Vec<DesktopProcessInfo>) -> Self {
Self::new(502, "Desktop Input Failed", "desktop_input_failed", message)
.with_processes(processes)
}
pub fn to_problem_details(&self) -> ProblemDetails {
let mut extensions = Map::new();
extensions.insert("code".to_string(), Value::String(self.code.to_string()));
if !self.missing_dependencies.is_empty() {
extensions.insert(
"missingDependencies".to_string(),
Value::Array(
self.missing_dependencies
.iter()
.cloned()
.map(Value::String)
.collect(),
),
);
}
if let Some(install_command) = self.install_command.as_ref() {
extensions.insert(
"installCommand".to_string(),
Value::String(install_command.clone()),
);
}
if !self.processes.is_empty() {
extensions.insert("processes".to_string(), json!(self.processes));
}
ProblemDetails {
type_: format!("urn:sandbox-agent:error:{}", self.code),
title: self.title.to_string(),
status: self.status,
detail: Some(self.message.clone()),
instance: None,
extensions,
}
}
pub fn to_error_info(&self) -> DesktopErrorInfo {
DesktopErrorInfo {
code: self.code.to_string(),
message: self.message.clone(),
}
}
pub fn code(&self) -> &'static str {
self.code
}
fn new(
status: u16,
title: &'static str,
code: &'static str,
message: impl Into<String>,
) -> Self {
Self {
status,
title,
code,
message: message.into(),
missing_dependencies: Vec::new(),
install_command: None,
processes: Vec::new(),
}
}
fn with_missing_dependencies(mut self, missing_dependencies: Vec<String>) -> Self {
self.missing_dependencies = missing_dependencies;
self
}
fn with_install_command(mut self, install_command: Option<String>) -> Self {
self.install_command = install_command;
self
}
fn with_processes(mut self, processes: Vec<DesktopProcessInfo>) -> Self {
self.processes = processes;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dependencies_missing_detail_includes_install_command() {
let problem = DesktopProblem::dependencies_missing(
vec!["Xvfb".to_string(), "openbox".to_string()],
Some("sandbox-agent install desktop --yes".to_string()),
Vec::new(),
);
let details = problem.to_problem_details();
let detail = details.detail.expect("detail");
assert!(detail.contains("Desktop dependencies are not installed: Xvfb, openbox"));
assert!(detail.contains("sandbox-agent install desktop --yes"));
assert_eq!(
details.extensions.get("installCommand"),
Some(&Value::String(
"sandbox-agent install desktop --yes".to_string()
))
);
}
}

View file

@ -0,0 +1,324 @@
use std::fmt;
use std::io::{self, Write};
use std::path::PathBuf;
use std::process::Command as ProcessCommand;
use clap::ValueEnum;
const AUTOMATIC_INSTALL_SUPPORTED_DISTROS: &str =
"Automatic desktop dependency installation is supported on Debian/Ubuntu (apt), Fedora/RHEL (dnf), and Alpine (apk).";
const AUTOMATIC_INSTALL_UNSUPPORTED_ENVS: &str =
"Automatic installation is not supported on macOS, Windows, or Linux distributions without apt, dnf, or apk.";
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum DesktopPackageManager {
Apt,
Dnf,
Apk,
}
#[derive(Debug, Clone)]
pub struct DesktopInstallRequest {
pub yes: bool,
pub print_only: bool,
pub package_manager: Option<DesktopPackageManager>,
pub no_fonts: bool,
}
pub(crate) fn desktop_platform_support_message() -> String {
format!("Desktop APIs are only supported on Linux. {AUTOMATIC_INSTALL_SUPPORTED_DISTROS}")
}
fn linux_install_support_message() -> String {
format!("{AUTOMATIC_INSTALL_SUPPORTED_DISTROS} {AUTOMATIC_INSTALL_UNSUPPORTED_ENVS}")
}
pub fn install_desktop(request: DesktopInstallRequest) -> Result<(), String> {
if std::env::consts::OS != "linux" {
return Err(format!(
"desktop installation is only supported on Linux. {}",
linux_install_support_message()
));
}
let package_manager = match request.package_manager {
Some(value) => value,
None => detect_package_manager().ok_or_else(|| {
format!(
"could not detect a supported package manager. {} Install the desktop dependencies manually on this distribution.",
linux_install_support_message()
)
})?,
};
let packages = desktop_packages(package_manager, request.no_fonts);
let used_sudo = !running_as_root() && find_binary("sudo").is_some();
if !running_as_root() && !used_sudo {
return Err(
"desktop installation requires root or sudo access; rerun as root or install dependencies manually"
.to_string(),
);
}
println!("Desktop package manager: {}", package_manager);
println!("Desktop packages:");
for package in &packages {
println!(" - {package}");
}
println!("Install command:");
println!(
" {}",
render_install_command(package_manager, used_sudo, &packages)
);
if request.print_only {
return Ok(());
}
if !request.yes && !prompt_yes_no("Proceed with desktop dependency installation? [y/N] ")? {
return Err("installation cancelled".to_string());
}
run_install_commands(package_manager, used_sudo, &packages)?;
println!("Desktop dependencies installed.");
Ok(())
}
fn detect_package_manager() -> Option<DesktopPackageManager> {
if find_binary("apt-get").is_some() {
return Some(DesktopPackageManager::Apt);
}
if find_binary("dnf").is_some() {
return Some(DesktopPackageManager::Dnf);
}
if find_binary("apk").is_some() {
return Some(DesktopPackageManager::Apk);
}
None
}
fn desktop_packages(package_manager: DesktopPackageManager, no_fonts: bool) -> Vec<String> {
let mut packages = match package_manager {
DesktopPackageManager::Apt => vec![
"xvfb",
"openbox",
"xdotool",
"imagemagick",
"ffmpeg",
"x11-xserver-utils",
"dbus-x11",
"xauth",
"fonts-dejavu-core",
],
DesktopPackageManager::Dnf => vec![
"xorg-x11-server-Xvfb",
"openbox",
"xdotool",
"ImageMagick",
"ffmpeg",
"xrandr",
"dbus-x11",
"xauth",
"dejavu-sans-fonts",
],
DesktopPackageManager::Apk => vec![
"xvfb",
"openbox",
"xdotool",
"imagemagick",
"ffmpeg",
"xrandr",
"dbus",
"xauth",
"ttf-dejavu",
],
}
.into_iter()
.map(str::to_string)
.collect::<Vec<_>>();
if no_fonts {
packages.retain(|package| {
package != "fonts-dejavu-core"
&& package != "dejavu-sans-fonts"
&& package != "ttf-dejavu"
});
}
packages
}
fn render_install_command(
package_manager: DesktopPackageManager,
used_sudo: bool,
packages: &[String],
) -> String {
let sudo = if used_sudo { "sudo " } else { "" };
match package_manager {
DesktopPackageManager::Apt => format!(
"{sudo}apt-get update && {sudo}env DEBIAN_FRONTEND=noninteractive apt-get install -y {}",
packages.join(" ")
),
DesktopPackageManager::Dnf => {
format!("{sudo}dnf install -y {}", packages.join(" "))
}
DesktopPackageManager::Apk => {
format!("{sudo}apk add --no-cache {}", packages.join(" "))
}
}
}
fn run_install_commands(
package_manager: DesktopPackageManager,
used_sudo: bool,
packages: &[String],
) -> Result<(), String> {
match package_manager {
DesktopPackageManager::Apt => {
run_command(command_with_privilege(
used_sudo,
"apt-get",
vec!["update".to_string()],
))?;
let mut args = vec![
"DEBIAN_FRONTEND=noninteractive".to_string(),
"apt-get".to_string(),
"install".to_string(),
"-y".to_string(),
];
args.extend(packages.iter().cloned());
run_command(command_with_privilege(used_sudo, "env", args))?;
}
DesktopPackageManager::Dnf => {
let mut args = vec!["install".to_string(), "-y".to_string()];
args.extend(packages.iter().cloned());
run_command(command_with_privilege(used_sudo, "dnf", args))?;
}
DesktopPackageManager::Apk => {
let mut args = vec!["add".to_string(), "--no-cache".to_string()];
args.extend(packages.iter().cloned());
run_command(command_with_privilege(used_sudo, "apk", args))?;
}
}
Ok(())
}
fn command_with_privilege(
used_sudo: bool,
program: &str,
args: Vec<String>,
) -> (String, Vec<String>) {
if used_sudo {
let mut sudo_args = vec![program.to_string()];
sudo_args.extend(args);
("sudo".to_string(), sudo_args)
} else {
(program.to_string(), args)
}
}
fn run_command((program, args): (String, Vec<String>)) -> Result<(), String> {
let status = ProcessCommand::new(&program)
.args(&args)
.status()
.map_err(|err| format!("failed to run `{program}`: {err}"))?;
if !status.success() {
return Err(format!(
"command `{}` exited with status {}",
format_command(&program, &args),
status
));
}
Ok(())
}
fn prompt_yes_no(prompt: &str) -> Result<bool, String> {
print!("{prompt}");
io::stdout()
.flush()
.map_err(|err| format!("failed to flush prompt: {err}"))?;
let mut input = String::new();
io::stdin()
.read_line(&mut input)
.map_err(|err| format!("failed to read confirmation: {err}"))?;
let normalized = input.trim().to_ascii_lowercase();
Ok(matches!(normalized.as_str(), "y" | "yes"))
}
fn running_as_root() -> bool {
#[cfg(unix)]
unsafe {
return libc::geteuid() == 0;
}
#[cfg(not(unix))]
{
false
}
}
fn find_binary(name: &str) -> Option<PathBuf> {
let path_env = std::env::var_os("PATH")?;
for path in std::env::split_paths(&path_env) {
let candidate = path.join(name);
if candidate.is_file() {
return Some(candidate);
}
}
None
}
fn format_command(program: &str, args: &[String]) -> String {
let mut parts = vec![program.to_string()];
parts.extend(args.iter().cloned());
parts.join(" ")
}
impl fmt::Display for DesktopPackageManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DesktopPackageManager::Apt => write!(f, "apt"),
DesktopPackageManager::Dnf => write!(f, "dnf"),
DesktopPackageManager::Apk => write!(f, "apk"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn desktop_platform_support_message_mentions_linux_and_supported_distros() {
let message = desktop_platform_support_message();
assert!(message.contains("only supported on Linux"));
assert!(message.contains("Debian/Ubuntu (apt)"));
assert!(message.contains("Fedora/RHEL (dnf)"));
assert!(message.contains("Alpine (apk)"));
}
#[test]
fn linux_install_support_message_mentions_unsupported_environments() {
let message = linux_install_support_message();
assert!(message.contains("Debian/Ubuntu (apt)"));
assert!(message.contains("Fedora/RHEL (dnf)"));
assert!(message.contains("Alpine (apk)"));
assert!(message.contains("macOS"));
assert!(message.contains("Windows"));
assert!(message.contains("without apt, dnf, or apk"));
}
#[test]
fn desktop_packages_support_no_fonts() {
let packages = desktop_packages(DesktopPackageManager::Apt, true);
assert!(!packages.iter().any(|value| value == "fonts-dejavu-core"));
assert!(packages.iter().any(|value| value == "xvfb"));
}
#[test]
fn render_install_command_matches_package_manager() {
let packages = vec!["xvfb".to_string(), "openbox".to_string()];
let command = render_install_command(DesktopPackageManager::Apk, false, &packages);
assert_eq!(command, "apk add --no-cache xvfb openbox");
}
}

View file

@ -0,0 +1,329 @@
use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Mutex;
use sandbox_agent_error::SandboxError;
use crate::desktop_types::{
DesktopRecordingInfo, DesktopRecordingListResponse, DesktopRecordingStartRequest,
DesktopRecordingStatus, DesktopResolution,
};
use crate::process_runtime::{
ProcessOwner, ProcessRuntime, ProcessStartSpec, ProcessStatus, RestartPolicy,
};
#[derive(Debug, Clone)]
pub struct DesktopRecordingContext {
pub display: String,
pub environment: std::collections::HashMap<String, String>,
pub resolution: DesktopResolution,
}
#[derive(Debug, Clone)]
pub struct DesktopRecordingManager {
process_runtime: Arc<ProcessRuntime>,
recordings_dir: PathBuf,
inner: Arc<Mutex<DesktopRecordingState>>,
}
#[derive(Debug, Default)]
struct DesktopRecordingState {
next_id: u64,
current_id: Option<String>,
recordings: BTreeMap<String, RecordingEntry>,
}
#[derive(Debug, Clone)]
struct RecordingEntry {
info: DesktopRecordingInfo,
path: PathBuf,
}
impl DesktopRecordingManager {
pub fn new(process_runtime: Arc<ProcessRuntime>, state_dir: PathBuf) -> Self {
Self {
process_runtime,
recordings_dir: state_dir.join("recordings"),
inner: Arc::new(Mutex::new(DesktopRecordingState::default())),
}
}
pub async fn start(
&self,
context: DesktopRecordingContext,
request: DesktopRecordingStartRequest,
) -> Result<DesktopRecordingInfo, SandboxError> {
if find_binary("ffmpeg").is_none() {
return Err(SandboxError::Conflict {
message: "ffmpeg is required for desktop recording".to_string(),
});
}
self.ensure_recordings_dir()?;
{
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
if state.current_id.is_some() {
return Err(SandboxError::Conflict {
message: "a desktop recording is already active".to_string(),
});
}
}
let mut state = self.inner.lock().await;
let id_num = state.next_id + 1;
state.next_id = id_num;
let id = format!("rec_{id_num}");
let file_name = format!("{id}.mp4");
let path = self.recordings_dir.join(&file_name);
let fps = request.fps.unwrap_or(30).clamp(1, 60);
let args = vec![
"-y".to_string(),
"-video_size".to_string(),
format!("{}x{}", context.resolution.width, context.resolution.height),
"-framerate".to_string(),
fps.to_string(),
"-f".to_string(),
"x11grab".to_string(),
"-i".to_string(),
context.display,
"-c:v".to_string(),
"libx264".to_string(),
"-preset".to_string(),
"ultrafast".to_string(),
"-pix_fmt".to_string(),
"yuv420p".to_string(),
path.to_string_lossy().to_string(),
];
let snapshot = self
.process_runtime
.start_process(ProcessStartSpec {
command: "ffmpeg".to_string(),
args,
cwd: None,
env: context.environment,
tty: false,
interactive: false,
owner: ProcessOwner::Desktop,
restart_policy: Some(RestartPolicy::Never),
})
.await?;
let info = DesktopRecordingInfo {
id: id.clone(),
status: DesktopRecordingStatus::Recording,
process_id: Some(snapshot.id),
file_name,
bytes: 0,
started_at: chrono::Utc::now().to_rfc3339(),
ended_at: None,
};
state.current_id = Some(id.clone());
state.recordings.insert(
id,
RecordingEntry {
info: info.clone(),
path,
},
);
Ok(info)
}
pub async fn stop(&self) -> Result<DesktopRecordingInfo, SandboxError> {
let (recording_id, process_id) = {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
let recording_id = state
.current_id
.clone()
.ok_or_else(|| SandboxError::Conflict {
message: "no desktop recording is active".to_string(),
})?;
let process_id = state
.recordings
.get(&recording_id)
.and_then(|entry| entry.info.process_id.clone());
(recording_id, process_id)
};
if let Some(process_id) = process_id {
let snapshot = self
.process_runtime
.stop_process(&process_id, Some(5_000))
.await?;
if snapshot.status == ProcessStatus::Running {
let _ = self
.process_runtime
.kill_process(&process_id, Some(1_000))
.await;
}
}
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
let entry = state
.recordings
.get(&recording_id)
.ok_or_else(|| SandboxError::NotFound {
resource: "desktop_recording".to_string(),
id: recording_id.clone(),
})?;
Ok(entry.info.clone())
}
pub async fn list(&self) -> Result<DesktopRecordingListResponse, SandboxError> {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
Ok(DesktopRecordingListResponse {
recordings: state
.recordings
.values()
.map(|entry| entry.info.clone())
.collect(),
})
}
pub async fn get(&self, id: &str) -> Result<DesktopRecordingInfo, SandboxError> {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
state
.recordings
.get(id)
.map(|entry| entry.info.clone())
.ok_or_else(|| SandboxError::NotFound {
resource: "desktop_recording".to_string(),
id: id.to_string(),
})
}
pub async fn download_path(&self, id: &str) -> Result<PathBuf, SandboxError> {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
let entry = state
.recordings
.get(id)
.ok_or_else(|| SandboxError::NotFound {
resource: "desktop_recording".to_string(),
id: id.to_string(),
})?;
if !entry.path.is_file() {
return Err(SandboxError::NotFound {
resource: "desktop_recording_file".to_string(),
id: id.to_string(),
});
}
Ok(entry.path.clone())
}
pub async fn delete(&self, id: &str) -> Result<(), SandboxError> {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
if state.current_id.as_deref() == Some(id) {
return Err(SandboxError::Conflict {
message: "stop the active desktop recording before deleting it".to_string(),
});
}
let entry = state
.recordings
.remove(id)
.ok_or_else(|| SandboxError::NotFound {
resource: "desktop_recording".to_string(),
id: id.to_string(),
})?;
if entry.path.exists() {
fs::remove_file(&entry.path).map_err(|err| SandboxError::StreamError {
message: format!(
"failed to delete desktop recording {}: {err}",
entry.path.display()
),
})?;
}
Ok(())
}
fn ensure_recordings_dir(&self) -> Result<(), SandboxError> {
fs::create_dir_all(&self.recordings_dir).map_err(|err| SandboxError::StreamError {
message: format!(
"failed to create desktop recordings dir {}: {err}",
self.recordings_dir.display()
),
})
}
async fn refresh_locked(&self, state: &mut DesktopRecordingState) -> Result<(), SandboxError> {
let ids: Vec<String> = state.recordings.keys().cloned().collect();
for id in ids {
let should_clear_current = {
let Some(entry) = state.recordings.get_mut(&id) else {
continue;
};
let Some(process_id) = entry.info.process_id.clone() else {
Self::refresh_bytes(entry);
continue;
};
let snapshot = match self.process_runtime.snapshot(&process_id).await {
Ok(snapshot) => snapshot,
Err(SandboxError::NotFound { .. }) => {
Self::finalize_entry(entry, false);
continue;
}
Err(err) => return Err(err),
};
if snapshot.status == ProcessStatus::Running {
Self::refresh_bytes(entry);
false
} else {
Self::finalize_entry(entry, snapshot.exit_code == Some(0));
true
}
};
if should_clear_current && state.current_id.as_deref() == Some(id.as_str()) {
state.current_id = None;
}
}
Ok(())
}
fn refresh_bytes(entry: &mut RecordingEntry) {
entry.info.bytes = file_size(&entry.path);
}
fn finalize_entry(entry: &mut RecordingEntry, success: bool) {
let bytes = file_size(&entry.path);
entry.info.status = if success || (entry.path.is_file() && bytes > 0) {
DesktopRecordingStatus::Completed
} else {
DesktopRecordingStatus::Failed
};
entry
.info
.ended_at
.get_or_insert_with(|| chrono::Utc::now().to_rfc3339());
entry.info.bytes = bytes;
}
}
fn find_binary(name: &str) -> Option<PathBuf> {
let path_env = std::env::var_os("PATH")?;
for path in std::env::split_paths(&path_env) {
let candidate = path.join(name);
if candidate.is_file() {
return Some(candidate);
}
}
None
}
fn file_size(path: &Path) -> u64 {
fs::metadata(path)
.map(|metadata| metadata.len())
.unwrap_or(0)
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,47 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use sandbox_agent_error::SandboxError;
use crate::desktop_types::DesktopStreamStatusResponse;
#[derive(Debug, Clone)]
pub struct DesktopStreamingManager {
inner: Arc<Mutex<DesktopStreamingState>>,
}
#[derive(Debug, Default)]
struct DesktopStreamingState {
active: bool,
}
impl DesktopStreamingManager {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(DesktopStreamingState::default())),
}
}
pub async fn start(&self) -> DesktopStreamStatusResponse {
let mut state = self.inner.lock().await;
state.active = true;
DesktopStreamStatusResponse { active: true }
}
pub async fn stop(&self) -> DesktopStreamStatusResponse {
let mut state = self.inner.lock().await;
state.active = false;
DesktopStreamStatusResponse { active: false }
}
pub async fn ensure_active(&self) -> Result<(), SandboxError> {
if self.inner.lock().await.active {
Ok(())
} else {
Err(SandboxError::Conflict {
message: "desktop streaming is not active".to_string(),
})
}
}
}

View file

@ -0,0 +1,302 @@
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum DesktopState {
Inactive,
InstallRequired,
Starting,
Active,
Stopping,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopResolution {
pub width: u32,
pub height: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dpi: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopErrorInfo {
pub code: String,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopProcessInfo {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
pub running: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub log_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopStatusResponse {
pub state: DesktopState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub display: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resolution: Option<DesktopResolution>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub started_at: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_error: Option<DesktopErrorInfo>,
#[serde(default)]
pub missing_dependencies: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub install_command: Option<String>,
#[serde(default)]
pub processes: Vec<DesktopProcessInfo>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub runtime_log_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, IntoParams, Default)]
#[serde(rename_all = "camelCase")]
pub struct DesktopStartRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub width: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub height: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dpi: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, IntoParams, Default)]
#[serde(rename_all = "camelCase")]
pub struct DesktopScreenshotQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub format: Option<DesktopScreenshotFormat>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quality: Option<u8>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub scale: Option<f32>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum DesktopScreenshotFormat {
Png,
Jpeg,
Webp,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, IntoParams)]
#[serde(rename_all = "camelCase")]
pub struct DesktopRegionScreenshotQuery {
pub x: i32,
pub y: i32,
pub width: u32,
pub height: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub format: Option<DesktopScreenshotFormat>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quality: Option<u8>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub scale: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMousePositionResponse {
pub x: i32,
pub y: i32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub screen: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub window: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum DesktopMouseButton {
Left,
Middle,
Right,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMouseMoveRequest {
pub x: i32,
pub y: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMouseClickRequest {
pub x: i32,
pub y: i32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub button: Option<DesktopMouseButton>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub click_count: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMouseDownRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub x: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub y: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub button: Option<DesktopMouseButton>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMouseUpRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub x: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub y: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub button: Option<DesktopMouseButton>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMouseDragRequest {
pub start_x: i32,
pub start_y: i32,
pub end_x: i32,
pub end_y: i32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub button: Option<DesktopMouseButton>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMouseScrollRequest {
pub x: i32,
pub y: i32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delta_x: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delta_y: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopKeyboardTypeRequest {
pub text: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delay_ms: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopKeyboardPressRequest {
pub key: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub modifiers: Option<DesktopKeyModifiers>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")]
pub struct DesktopKeyModifiers {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ctrl: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub shift: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alt: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cmd: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopKeyboardDownRequest {
pub key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopKeyboardUpRequest {
pub key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopActionResponse {
pub ok: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopDisplayInfoResponse {
pub display: String,
pub resolution: DesktopResolution,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopWindowInfo {
pub id: String,
pub title: String,
pub x: i32,
pub y: i32,
pub width: u32,
pub height: u32,
pub is_active: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopWindowListResponse {
pub windows: Vec<DesktopWindowInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, Default)]
#[serde(rename_all = "camelCase")]
pub struct DesktopRecordingStartRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub fps: Option<u32>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum DesktopRecordingStatus {
Recording,
Completed,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopRecordingInfo {
pub id: String,
pub status: DesktopRecordingStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub process_id: Option<String>,
pub file_name: String,
pub bytes: u64,
pub started_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ended_at: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopRecordingListResponse {
pub recordings: Vec<DesktopRecordingInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopStreamStatusResponse {
pub active: bool,
}

View file

@ -3,6 +3,12 @@
mod acp_proxy_runtime;
pub mod cli;
pub mod daemon;
mod desktop_errors;
mod desktop_install;
mod desktop_recording;
mod desktop_runtime;
mod desktop_streaming;
pub mod desktop_types;
mod process_runtime;
pub mod router;
pub mod server_logs;

View file

@ -1,5 +1,5 @@
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
@ -27,6 +27,22 @@ pub enum ProcessStream {
Pty,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessOwner {
User,
Desktop,
System,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum RestartPolicy {
Never,
Always,
OnFailure,
}
#[derive(Debug, Clone)]
pub struct ProcessStartSpec {
pub command: String,
@ -35,6 +51,8 @@ pub struct ProcessStartSpec {
pub env: HashMap<String, String>,
pub tty: bool,
pub interactive: bool,
pub owner: ProcessOwner,
pub restart_policy: Option<RestartPolicy>,
}
#[derive(Debug, Clone)]
@ -78,6 +96,7 @@ pub struct ProcessSnapshot {
pub cwd: Option<String>,
pub tty: bool,
pub interactive: bool,
pub owner: ProcessOwner,
pub status: ProcessStatus,
pub pid: Option<u32>,
pub exit_code: Option<i32>,
@ -129,17 +148,27 @@ struct ManagedProcess {
cwd: Option<String>,
tty: bool,
interactive: bool,
owner: ProcessOwner,
#[allow(dead_code)]
restart_policy: RestartPolicy,
spec: ProcessStartSpec,
created_at_ms: i64,
pid: Option<u32>,
max_log_bytes: usize,
stdin: Mutex<Option<ProcessStdin>>,
#[cfg(unix)]
pty_resize_fd: Mutex<Option<std::fs::File>>,
runtime: Mutex<ManagedRuntime>,
status: RwLock<ManagedStatus>,
sequence: AtomicU64,
logs: Mutex<VecDeque<StoredLog>>,
total_log_bytes: Mutex<usize>,
log_tx: broadcast::Sender<ProcessLogLine>,
stop_requested: AtomicBool,
}
#[derive(Debug)]
struct ManagedRuntime {
pid: Option<u32>,
stdin: Option<ProcessStdin>,
#[cfg(unix)]
pty_resize_fd: Option<std::fs::File>,
}
#[derive(Debug)]
@ -162,17 +191,17 @@ struct ManagedStatus {
}
struct SpawnedPipeProcess {
process: Arc<ManagedProcess>,
child: Child,
stdout: tokio::process::ChildStdout,
stderr: tokio::process::ChildStderr,
runtime: ManagedRuntime,
}
#[cfg(unix)]
struct SpawnedTtyProcess {
process: Arc<ManagedProcess>,
child: Child,
reader: tokio::fs::File,
runtime: ManagedRuntime,
}
impl ProcessRuntime {
@ -224,21 +253,14 @@ impl ProcessRuntime {
&self,
spec: ProcessStartSpec,
) -> Result<ProcessSnapshot, SandboxError> {
let config = self.get_config().await;
let process_refs = {
let processes = self.inner.processes.read().await;
processes.values().cloned().collect::<Vec<_>>()
};
let mut running_count = 0usize;
for process in process_refs {
if process.status.read().await.status == ProcessStatus::Running {
running_count += 1;
}
if spec.command.trim().is_empty() {
return Err(SandboxError::InvalidRequest {
message: "command must not be empty".to_string(),
});
}
if running_count >= config.max_concurrent_processes {
let config = self.get_config().await;
if self.running_process_count().await >= config.max_concurrent_processes {
return Err(SandboxError::Conflict {
message: format!(
"max concurrent process limit reached ({})",
@ -247,73 +269,44 @@ impl ProcessRuntime {
});
}
if spec.command.trim().is_empty() {
return Err(SandboxError::InvalidRequest {
message: "command must not be empty".to_string(),
});
}
let id_num = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
let id = format!("proc_{id_num}");
let process = Arc::new(ManagedProcess {
id: id.clone(),
command: spec.command.clone(),
args: spec.args.clone(),
cwd: spec.cwd.clone(),
tty: spec.tty,
interactive: spec.interactive,
owner: spec.owner,
restart_policy: spec.restart_policy.unwrap_or(RestartPolicy::Never),
spec,
created_at_ms: now_ms(),
max_log_bytes: config.max_log_bytes_per_process,
runtime: Mutex::new(ManagedRuntime {
pid: None,
stdin: None,
#[cfg(unix)]
pty_resize_fd: None,
}),
status: RwLock::new(ManagedStatus {
status: ProcessStatus::Running,
exit_code: None,
exited_at_ms: None,
}),
sequence: AtomicU64::new(1),
logs: Mutex::new(VecDeque::new()),
total_log_bytes: Mutex::new(0),
log_tx: broadcast::channel(512).0,
stop_requested: AtomicBool::new(false),
});
if spec.tty {
#[cfg(unix)]
{
let spawned = self
.spawn_tty_process(id.clone(), spec, config.max_log_bytes_per_process)
.await?;
let process = spawned.process.clone();
self.inner
.processes
.write()
.await
.insert(id, process.clone());
let p = process.clone();
tokio::spawn(async move {
pump_output(p, spawned.reader, ProcessStream::Pty).await;
});
let p = process.clone();
tokio::spawn(async move {
watch_exit(p, spawned.child).await;
});
return Ok(process.snapshot().await);
}
#[cfg(not(unix))]
{
return Err(SandboxError::StreamError {
message: "tty process mode is not supported on this platform".to_string(),
});
}
}
let spawned = self
.spawn_pipe_process(id.clone(), spec, config.max_log_bytes_per_process)
.await?;
let process = spawned.process.clone();
self.spawn_existing_process(process.clone()).await?;
self.inner
.processes
.write()
.await
.insert(id, process.clone());
let p = process.clone();
tokio::spawn(async move {
pump_output(p, spawned.stdout, ProcessStream::Stdout).await;
});
let p = process.clone();
tokio::spawn(async move {
pump_output(p, spawned.stderr, ProcessStream::Stderr).await;
});
let p = process.clone();
tokio::spawn(async move {
watch_exit(p, spawned.child).await;
});
Ok(process.snapshot().await)
}
@ -412,11 +405,13 @@ impl ProcessRuntime {
})
}
pub async fn list_processes(&self) -> Vec<ProcessSnapshot> {
pub async fn list_processes(&self, owner: Option<ProcessOwner>) -> Vec<ProcessSnapshot> {
let processes = self.inner.processes.read().await;
let mut items = Vec::with_capacity(processes.len());
for process in processes.values() {
items.push(process.snapshot().await);
if owner.is_none_or(|expected| process.owner == expected) {
items.push(process.snapshot().await);
}
}
items.sort_by(|a, b| a.id.cmp(&b.id));
items
@ -453,6 +448,7 @@ impl ProcessRuntime {
wait_ms: Option<u64>,
) -> Result<ProcessSnapshot, SandboxError> {
let process = self.lookup_process(id).await?;
process.stop_requested.store(true, Ordering::SeqCst);
process.send_signal(SIGTERM).await?;
maybe_wait_for_exit(process.clone(), wait_ms.unwrap_or(2_000)).await;
Ok(process.snapshot().await)
@ -464,6 +460,7 @@ impl ProcessRuntime {
wait_ms: Option<u64>,
) -> Result<ProcessSnapshot, SandboxError> {
let process = self.lookup_process(id).await?;
process.stop_requested.store(true, Ordering::SeqCst);
process.send_signal(SIGKILL).await?;
maybe_wait_for_exit(process.clone(), wait_ms.unwrap_or(1_000)).await;
Ok(process.snapshot().await)
@ -506,6 +503,17 @@ impl ProcessRuntime {
Ok(process.log_tx.subscribe())
}
async fn running_process_count(&self) -> usize {
let processes = self.inner.processes.read().await;
let mut running = 0usize;
for process in processes.values() {
if process.status.read().await.status == ProcessStatus::Running {
running += 1;
}
}
running
}
async fn lookup_process(&self, id: &str) -> Result<Arc<ManagedProcess>, SandboxError> {
let process = self.inner.processes.read().await.get(id).cloned();
process.ok_or_else(|| SandboxError::NotFound {
@ -514,11 +522,83 @@ impl ProcessRuntime {
})
}
async fn spawn_pipe_process(
async fn spawn_existing_process(
&self,
id: String,
spec: ProcessStartSpec,
max_log_bytes: usize,
process: Arc<ManagedProcess>,
) -> Result<(), SandboxError> {
process.stop_requested.store(false, Ordering::SeqCst);
let mut runtime_guard = process.runtime.lock().await;
let mut status_guard = process.status.write().await;
if process.tty {
#[cfg(unix)]
{
let SpawnedTtyProcess {
child,
reader,
runtime,
} = self.spawn_tty_process(&process.spec)?;
*runtime_guard = runtime;
status_guard.status = ProcessStatus::Running;
status_guard.exit_code = None;
status_guard.exited_at_ms = None;
drop(status_guard);
drop(runtime_guard);
let process_for_output = process.clone();
tokio::spawn(async move {
pump_output(process_for_output, reader, ProcessStream::Pty).await;
});
let runtime = self.clone();
tokio::spawn(async move {
watch_exit(runtime, process, child).await;
});
return Ok(());
}
#[cfg(not(unix))]
{
return Err(SandboxError::StreamError {
message: "tty process mode is not supported on this platform".to_string(),
});
}
}
let SpawnedPipeProcess {
child,
stdout,
stderr,
runtime,
} = self.spawn_pipe_process(&process.spec)?;
*runtime_guard = runtime;
status_guard.status = ProcessStatus::Running;
status_guard.exit_code = None;
status_guard.exited_at_ms = None;
drop(status_guard);
drop(runtime_guard);
let process_for_stdout = process.clone();
tokio::spawn(async move {
pump_output(process_for_stdout, stdout, ProcessStream::Stdout).await;
});
let process_for_stderr = process.clone();
tokio::spawn(async move {
pump_output(process_for_stderr, stderr, ProcessStream::Stderr).await;
});
let runtime = self.clone();
tokio::spawn(async move {
watch_exit(runtime, process, child).await;
});
Ok(())
}
fn spawn_pipe_process(
&self,
spec: &ProcessStartSpec,
) -> Result<SpawnedPipeProcess, SandboxError> {
let mut cmd = Command::new(&spec.command);
cmd.args(&spec.args)
@ -551,35 +631,14 @@ impl ProcessRuntime {
.ok_or_else(|| SandboxError::StreamError {
message: "failed to capture stderr".to_string(),
})?;
let pid = child.id();
let (tx, _rx) = broadcast::channel(512);
let process = Arc::new(ManagedProcess {
id,
command: spec.command,
args: spec.args,
cwd: spec.cwd,
tty: false,
interactive: spec.interactive,
created_at_ms: now_ms(),
pid,
max_log_bytes,
stdin: Mutex::new(stdin.map(ProcessStdin::Pipe)),
#[cfg(unix)]
pty_resize_fd: Mutex::new(None),
status: RwLock::new(ManagedStatus {
status: ProcessStatus::Running,
exit_code: None,
exited_at_ms: None,
}),
sequence: AtomicU64::new(1),
logs: Mutex::new(VecDeque::new()),
total_log_bytes: Mutex::new(0),
log_tx: tx,
});
Ok(SpawnedPipeProcess {
process,
runtime: ManagedRuntime {
pid: child.id(),
stdin: stdin.map(ProcessStdin::Pipe),
#[cfg(unix)]
pty_resize_fd: None,
},
child,
stdout,
stderr,
@ -587,11 +646,9 @@ impl ProcessRuntime {
}
#[cfg(unix)]
async fn spawn_tty_process(
fn spawn_tty_process(
&self,
id: String,
spec: ProcessStartSpec,
max_log_bytes: usize,
spec: &ProcessStartSpec,
) -> Result<SpawnedTtyProcess, SandboxError> {
use std::os::fd::AsRawFd;
use std::process::Stdio;
@ -632,8 +689,8 @@ impl ProcessRuntime {
let child = cmd.spawn().map_err(|err| SandboxError::StreamError {
message: format!("failed to spawn tty process: {err}"),
})?;
let pid = child.id();
drop(slave_fd);
let master_raw = master_fd.as_raw_fd();
@ -644,32 +701,12 @@ impl ProcessRuntime {
let writer_file = tokio::fs::File::from_std(std::fs::File::from(writer_fd));
let resize_file = std::fs::File::from(resize_fd);
let (tx, _rx) = broadcast::channel(512);
let process = Arc::new(ManagedProcess {
id,
command: spec.command,
args: spec.args,
cwd: spec.cwd,
tty: true,
interactive: spec.interactive,
created_at_ms: now_ms(),
pid,
max_log_bytes,
stdin: Mutex::new(Some(ProcessStdin::Pty(writer_file))),
pty_resize_fd: Mutex::new(Some(resize_file)),
status: RwLock::new(ManagedStatus {
status: ProcessStatus::Running,
exit_code: None,
exited_at_ms: None,
}),
sequence: AtomicU64::new(1),
logs: Mutex::new(VecDeque::new()),
total_log_bytes: Mutex::new(0),
log_tx: tx,
});
Ok(SpawnedTtyProcess {
process,
runtime: ManagedRuntime {
pid,
stdin: Some(ProcessStdin::Pty(writer_file)),
pty_resize_fd: Some(resize_file),
},
child,
reader: reader_file,
})
@ -694,6 +731,7 @@ pub struct ProcessLogFilter {
impl ManagedProcess {
async fn snapshot(&self) -> ProcessSnapshot {
let status = self.status.read().await.clone();
let pid = self.runtime.lock().await.pid;
ProcessSnapshot {
id: self.id.clone(),
command: self.command.clone(),
@ -701,8 +739,9 @@ impl ManagedProcess {
cwd: self.cwd.clone(),
tty: self.tty,
interactive: self.interactive,
owner: self.owner,
status: status.status,
pid: self.pid,
pid,
exit_code: status.exit_code,
created_at_ms: self.created_at_ms,
exited_at_ms: status.exited_at_ms,
@ -752,10 +791,13 @@ impl ManagedProcess {
});
}
let mut guard = self.stdin.lock().await;
let stdin = guard.as_mut().ok_or_else(|| SandboxError::Conflict {
message: "process does not accept stdin".to_string(),
})?;
let mut runtime = self.runtime.lock().await;
let stdin = runtime
.stdin
.as_mut()
.ok_or_else(|| SandboxError::Conflict {
message: "process does not accept stdin".to_string(),
})?;
match stdin {
ProcessStdin::Pipe(pipe) => {
@ -825,7 +867,7 @@ impl ManagedProcess {
if self.status.read().await.status != ProcessStatus::Running {
return Ok(());
}
let Some(pid) = self.pid else {
let Some(pid) = self.runtime.lock().await.pid else {
return Ok(());
};
@ -840,8 +882,9 @@ impl ManagedProcess {
#[cfg(unix)]
{
use std::os::fd::AsRawFd;
let guard = self.pty_resize_fd.lock().await;
let Some(fd) = guard.as_ref() else {
let runtime = self.runtime.lock().await;
let Some(fd) = runtime.pty_resize_fd.as_ref() else {
return Err(SandboxError::Conflict {
message: "PTY resize handle unavailable".to_string(),
});
@ -857,6 +900,32 @@ impl ManagedProcess {
Ok(())
}
#[allow(dead_code)]
fn should_restart(&self, exit_code: Option<i32>) -> bool {
match self.restart_policy {
RestartPolicy::Never => false,
RestartPolicy::Always => true,
RestartPolicy::OnFailure => exit_code.unwrap_or(1) != 0,
}
}
async fn mark_exited(&self, exit_code: Option<i32>, exited_at_ms: Option<i64>) {
{
let mut status = self.status.write().await;
status.status = ProcessStatus::Exited;
status.exit_code = exit_code;
status.exited_at_ms = exited_at_ms;
}
let mut runtime = self.runtime.lock().await;
runtime.pid = None;
let _ = runtime.stdin.take();
#[cfg(unix)]
{
let _ = runtime.pty_resize_fd.take();
}
}
}
fn stream_matches(stream: ProcessStream, filter: ProcessLogFilterStream) -> bool {
@ -909,21 +978,16 @@ where
}
}
async fn watch_exit(process: Arc<ManagedProcess>, mut child: Child) {
async fn watch_exit(runtime: ProcessRuntime, process: Arc<ManagedProcess>, mut child: Child) {
let _ = runtime;
let wait = child.wait().await;
let (exit_code, exited_at_ms) = match wait {
Ok(status) => (status.code(), Some(now_ms())),
Err(_) => (None, Some(now_ms())),
};
{
let mut state = process.status.write().await;
state.status = ProcessStatus::Exited;
state.exit_code = exit_code;
state.exited_at_ms = exited_at_ms;
}
let _ = process.stdin.lock().await.take();
let _ = process.stop_requested.swap(false, Ordering::SeqCst);
process.mark_exited(exit_code, exited_at_ms).await;
}
async fn capture_output<R>(mut reader: R, max_bytes: usize) -> std::io::Result<(Vec<u8>, bool)>

File diff suppressed because it is too large Load diff

View file

@ -33,7 +33,8 @@ pub(super) async fn require_token(
.and_then(|value| value.to_str().ok())
.and_then(|value| value.strip_prefix("Bearer "));
let allow_query_token = request.uri().path().ends_with("/terminal/ws");
let allow_query_token = request.uri().path().ends_with("/terminal/ws")
|| request.uri().path().ends_with("/stream/ws");
let query_token = if allow_query_token {
request
.uri()

View file

@ -425,6 +425,14 @@ pub enum ProcessState {
Exited,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessOwner {
User,
Desktop,
System,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInfo {
@ -435,6 +443,7 @@ pub struct ProcessInfo {
pub cwd: Option<String>,
pub tty: bool,
pub interactive: bool,
pub owner: ProcessOwner,
pub status: ProcessState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
@ -451,6 +460,13 @@ pub struct ProcessListResponse {
pub processes: Vec<ProcessInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, IntoParams)]
#[serde(rename_all = "camelCase")]
pub struct ProcessListQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub owner: Option<ProcessOwner>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessLogsStream {

View file

@ -0,0 +1,593 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fs;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::OnceLock;
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use sandbox_agent::router::AuthConfig;
use serial_test::serial;
use tempfile::TempDir;
const CONTAINER_PORT: u16 = 3000;
const DEFAULT_PATH: &str = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin";
const DEFAULT_IMAGE_TAG: &str = "sandbox-agent-test:dev";
const STANDARD_PATHS: &[&str] = &[
"/usr/local/sbin",
"/usr/local/bin",
"/usr/sbin",
"/usr/bin",
"/sbin",
"/bin",
];
static IMAGE_TAG: OnceLock<String> = OnceLock::new();
static DOCKER_BIN: OnceLock<PathBuf> = OnceLock::new();
static CONTAINER_COUNTER: AtomicU64 = AtomicU64::new(0);
#[derive(Clone)]
pub struct DockerApp {
base_url: String,
}
impl DockerApp {
pub fn http_url(&self, path: &str) -> String {
format!("{}{}", self.base_url, path)
}
pub fn ws_url(&self, path: &str) -> String {
let suffix = self
.base_url
.strip_prefix("http://")
.unwrap_or(&self.base_url);
format!("ws://{suffix}{path}")
}
}
pub struct TestApp {
pub app: DockerApp,
install_dir: PathBuf,
_root: TempDir,
container_id: String,
}
#[derive(Default)]
pub struct TestAppOptions {
pub env: BTreeMap<String, String>,
pub extra_paths: Vec<PathBuf>,
pub replace_path: bool,
}
impl TestApp {
pub fn new(auth: AuthConfig) -> Self {
Self::with_setup(auth, |_| {})
}
pub fn with_setup<F>(auth: AuthConfig, setup: F) -> Self
where
F: FnOnce(&Path),
{
Self::with_options(auth, TestAppOptions::default(), setup)
}
pub fn with_options<F>(auth: AuthConfig, options: TestAppOptions, setup: F) -> Self
where
F: FnOnce(&Path),
{
let root = tempfile::tempdir().expect("create docker test root");
let layout = TestLayout::new(root.path());
layout.create();
setup(&layout.install_dir);
let container_id = unique_container_id();
let image = ensure_test_image();
let env = build_env(&layout, &auth, &options);
let mounts = build_mounts(root.path(), &env);
let base_url = run_container(&container_id, &image, &mounts, &env, &auth);
Self {
app: DockerApp { base_url },
install_dir: layout.install_dir,
_root: root,
container_id,
}
}
pub fn install_path(&self) -> &Path {
&self.install_dir
}
pub fn root_path(&self) -> &Path {
self._root.path()
}
}
impl Drop for TestApp {
fn drop(&mut self) {
let _ = Command::new(docker_bin())
.args(["rm", "-f", &self.container_id])
.output();
}
}
pub struct LiveServer {
base_url: String,
}
impl LiveServer {
pub async fn spawn(app: DockerApp) -> Self {
Self {
base_url: app.base_url,
}
}
pub fn http_url(&self, path: &str) -> String {
format!("{}{}", self.base_url, path)
}
pub fn ws_url(&self, path: &str) -> String {
let suffix = self
.base_url
.strip_prefix("http://")
.unwrap_or(&self.base_url);
format!("ws://{suffix}{path}")
}
pub async fn shutdown(self) {}
}
struct TestLayout {
home: PathBuf,
xdg_data_home: PathBuf,
xdg_state_home: PathBuf,
appdata: PathBuf,
local_appdata: PathBuf,
install_dir: PathBuf,
}
impl TestLayout {
fn new(root: &Path) -> Self {
let home = root.join("home");
let xdg_data_home = root.join("xdg-data");
let xdg_state_home = root.join("xdg-state");
let appdata = root.join("appdata").join("Roaming");
let local_appdata = root.join("appdata").join("Local");
let install_dir = xdg_data_home.join("sandbox-agent").join("bin");
Self {
home,
xdg_data_home,
xdg_state_home,
appdata,
local_appdata,
install_dir,
}
}
fn create(&self) {
for dir in [
&self.home,
&self.xdg_data_home,
&self.xdg_state_home,
&self.appdata,
&self.local_appdata,
&self.install_dir,
] {
fs::create_dir_all(dir).expect("create docker test dir");
}
}
}
fn ensure_test_image() -> String {
IMAGE_TAG
.get_or_init(|| {
let repo_root = repo_root();
let image_tag = std::env::var("SANDBOX_AGENT_TEST_IMAGE")
.unwrap_or_else(|_| DEFAULT_IMAGE_TAG.to_string());
let output = Command::new(docker_bin())
.args(["build", "--tag", &image_tag, "--file"])
.arg(
repo_root
.join("docker")
.join("test-agent")
.join("Dockerfile"),
)
.arg(&repo_root)
.output()
.expect("build sandbox-agent test image");
if !output.status.success() {
panic!(
"failed to build sandbox-agent test image: {}",
String::from_utf8_lossy(&output.stderr)
);
}
image_tag
})
.clone()
}
fn build_env(
layout: &TestLayout,
auth: &AuthConfig,
options: &TestAppOptions,
) -> BTreeMap<String, String> {
let mut env = BTreeMap::new();
env.insert(
"HOME".to_string(),
layout.home.to_string_lossy().to_string(),
);
env.insert(
"USERPROFILE".to_string(),
layout.home.to_string_lossy().to_string(),
);
env.insert(
"XDG_DATA_HOME".to_string(),
layout.xdg_data_home.to_string_lossy().to_string(),
);
env.insert(
"XDG_STATE_HOME".to_string(),
layout.xdg_state_home.to_string_lossy().to_string(),
);
env.insert(
"APPDATA".to_string(),
layout.appdata.to_string_lossy().to_string(),
);
env.insert(
"LOCALAPPDATA".to_string(),
layout.local_appdata.to_string_lossy().to_string(),
);
for (key, value) in std::env::vars() {
if key == "PATH" {
continue;
}
if key == "XDG_STATE_HOME" || key == "HOME" || key == "USERPROFILE" {
continue;
}
if key.starts_with("SANDBOX_AGENT_") || key.starts_with("OPENCODE_COMPAT_") {
env.insert(key.clone(), rewrite_localhost_url(&key, &value));
}
}
if let Some(token) = auth.token.as_ref() {
env.insert("SANDBOX_AGENT_TEST_AUTH_TOKEN".to_string(), token.clone());
}
if options.replace_path {
env.insert(
"PATH".to_string(),
options.env.get("PATH").cloned().unwrap_or_default(),
);
} else {
let mut custom_path_entries =
custom_path_entries(layout.install_dir.parent().expect("install base"));
custom_path_entries.extend(explicit_path_entries());
custom_path_entries.extend(
options
.extra_paths
.iter()
.filter(|path| path.is_absolute() && path.exists())
.cloned(),
);
custom_path_entries.sort();
custom_path_entries.dedup();
if custom_path_entries.is_empty() {
env.insert("PATH".to_string(), DEFAULT_PATH.to_string());
} else {
let joined = custom_path_entries
.iter()
.map(|path| path.to_string_lossy().to_string())
.collect::<Vec<_>>()
.join(":");
env.insert("PATH".to_string(), format!("{joined}:{DEFAULT_PATH}"));
}
}
for (key, value) in &options.env {
if key == "PATH" {
continue;
}
env.insert(key.clone(), rewrite_localhost_url(key, value));
}
env
}
fn build_mounts(root: &Path, env: &BTreeMap<String, String>) -> Vec<PathBuf> {
let mut mounts = BTreeSet::new();
mounts.insert(root.to_path_buf());
for key in [
"HOME",
"USERPROFILE",
"XDG_DATA_HOME",
"XDG_STATE_HOME",
"APPDATA",
"LOCALAPPDATA",
"SANDBOX_AGENT_DESKTOP_FAKE_STATE_DIR",
] {
if let Some(value) = env.get(key) {
let path = PathBuf::from(value);
if path.is_absolute() {
mounts.insert(path);
}
}
}
if let Some(path_value) = env.get("PATH") {
for entry in path_value.split(':') {
if entry.is_empty() || STANDARD_PATHS.contains(&entry) {
continue;
}
let path = PathBuf::from(entry);
if path.is_absolute() && path.exists() {
mounts.insert(path);
}
}
}
mounts.into_iter().collect()
}
fn run_container(
container_id: &str,
image: &str,
mounts: &[PathBuf],
env: &BTreeMap<String, String>,
auth: &AuthConfig,
) -> String {
let mut args = vec![
"run".to_string(),
"-d".to_string(),
"--rm".to_string(),
"--name".to_string(),
container_id.to_string(),
"-p".to_string(),
format!("127.0.0.1::{CONTAINER_PORT}"),
];
#[cfg(unix)]
{
args.push("--user".to_string());
args.push(format!("{}:{}", unsafe { libc::geteuid() }, unsafe {
libc::getegid()
}));
}
if cfg!(target_os = "linux") {
args.push("--add-host".to_string());
args.push("host.docker.internal:host-gateway".to_string());
}
for mount in mounts {
args.push("-v".to_string());
args.push(format!("{}:{}", mount.display(), mount.display()));
}
for (key, value) in env {
args.push("-e".to_string());
args.push(format!("{key}={value}"));
}
args.push(image.to_string());
args.push("server".to_string());
args.push("--host".to_string());
args.push("0.0.0.0".to_string());
args.push("--port".to_string());
args.push(CONTAINER_PORT.to_string());
match auth.token.as_ref() {
Some(token) => {
args.push("--token".to_string());
args.push(token.clone());
}
None => args.push("--no-token".to_string()),
}
let output = Command::new(docker_bin())
.args(&args)
.output()
.expect("start docker test container");
if !output.status.success() {
panic!(
"failed to start docker test container: {}",
String::from_utf8_lossy(&output.stderr)
);
}
let port_output = Command::new(docker_bin())
.args(["port", container_id, &format!("{CONTAINER_PORT}/tcp")])
.output()
.expect("resolve mapped docker port");
if !port_output.status.success() {
panic!(
"failed to resolve docker test port: {}",
String::from_utf8_lossy(&port_output.stderr)
);
}
let mapping = String::from_utf8(port_output.stdout)
.expect("docker port utf8")
.trim()
.to_string();
let host_port = mapping.rsplit(':').next().expect("mapped host port").trim();
let base_url = format!("http://127.0.0.1:{host_port}");
wait_for_health(&base_url, auth.token.as_deref());
base_url
}
fn wait_for_health(base_url: &str, token: Option<&str>) {
let started = SystemTime::now();
loop {
if probe_health(base_url, token) {
return;
}
if started
.elapsed()
.unwrap_or_else(|_| Duration::from_secs(0))
.gt(&Duration::from_secs(30))
{
panic!("timed out waiting for sandbox-agent docker test server");
}
thread::sleep(Duration::from_millis(200));
}
}
fn probe_health(base_url: &str, token: Option<&str>) -> bool {
let address = base_url.strip_prefix("http://").unwrap_or(base_url);
let mut stream = match TcpStream::connect(address) {
Ok(stream) => stream,
Err(_) => return false,
};
let _ = stream.set_read_timeout(Some(Duration::from_secs(2)));
let _ = stream.set_write_timeout(Some(Duration::from_secs(2)));
let mut request =
format!("GET /v1/health HTTP/1.1\r\nHost: {address}\r\nConnection: close\r\n");
if let Some(token) = token {
request.push_str(&format!("Authorization: Bearer {token}\r\n"));
}
request.push_str("\r\n");
if stream.write_all(request.as_bytes()).is_err() {
return false;
}
let mut response = String::new();
if stream.read_to_string(&mut response).is_err() {
return false;
}
response.starts_with("HTTP/1.1 200") || response.starts_with("HTTP/1.0 200")
}
fn custom_path_entries(root: &Path) -> Vec<PathBuf> {
let mut entries = Vec::new();
if let Some(value) = std::env::var_os("PATH") {
for entry in std::env::split_paths(&value) {
if !entry.exists() {
continue;
}
if entry.starts_with(root) || entry.starts_with(std::env::temp_dir()) {
entries.push(entry);
}
}
}
entries.sort();
entries.dedup();
entries
}
fn explicit_path_entries() -> Vec<PathBuf> {
let mut entries = Vec::new();
if let Some(value) = std::env::var_os("SANDBOX_AGENT_TEST_EXTRA_PATHS") {
for entry in std::env::split_paths(&value) {
if entry.is_absolute() && entry.exists() {
entries.push(entry);
}
}
}
entries
}
fn rewrite_localhost_url(key: &str, value: &str) -> String {
if key.ends_with("_URL") || key.ends_with("_URI") {
return value
.replace("http://127.0.0.1", "http://host.docker.internal")
.replace("http://localhost", "http://host.docker.internal");
}
value.to_string()
}
fn unique_container_id() -> String {
let millis = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|value| value.as_millis())
.unwrap_or(0);
let counter = CONTAINER_COUNTER.fetch_add(1, Ordering::Relaxed);
format!(
"sandbox-agent-test-{}-{millis}-{counter}",
std::process::id()
)
}
fn repo_root() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("../../..")
.canonicalize()
.expect("repo root")
}
fn docker_bin() -> &'static Path {
DOCKER_BIN
.get_or_init(|| {
if let Some(value) = std::env::var_os("SANDBOX_AGENT_TEST_DOCKER_BIN") {
let path = PathBuf::from(value);
if path.exists() {
return path;
}
}
for candidate in [
"/usr/local/bin/docker",
"/opt/homebrew/bin/docker",
"/usr/bin/docker",
] {
let path = PathBuf::from(candidate);
if path.exists() {
return path;
}
}
PathBuf::from("docker")
})
.as_path()
}
#[cfg(test)]
mod tests {
use super::*;
struct EnvVarGuard {
key: &'static str,
old: Option<std::ffi::OsString>,
}
impl EnvVarGuard {
fn set(key: &'static str, value: &Path) -> Self {
let old = std::env::var_os(key);
std::env::set_var(key, value);
Self { key, old }
}
}
impl Drop for EnvVarGuard {
fn drop(&mut self) {
match self.old.as_ref() {
Some(value) => std::env::set_var(self.key, value),
None => std::env::remove_var(self.key),
}
}
}
#[test]
#[serial]
fn build_env_keeps_test_local_xdg_state_home() {
let root = tempfile::tempdir().expect("create docker support tempdir");
let host_state = tempfile::tempdir().expect("create host xdg state tempdir");
let _guard = EnvVarGuard::set("XDG_STATE_HOME", host_state.path());
let layout = TestLayout::new(root.path());
layout.create();
let env = build_env(&layout, &AuthConfig::disabled(), &TestAppOptions::default());
assert_eq!(
env.get("XDG_STATE_HOME"),
Some(&layout.xdg_state_home.to_string_lossy().to_string())
);
}
}

View file

@ -1,37 +1,14 @@
use std::fs;
use std::path::Path;
use axum::body::Body;
use axum::http::{Method, Request, StatusCode};
use futures::StreamExt;
use http_body_util::BodyExt;
use sandbox_agent::router::{build_router, AppState, AuthConfig};
use sandbox_agent_agent_management::agents::AgentManager;
use reqwest::{Method, StatusCode};
use sandbox_agent::router::AuthConfig;
use serde_json::{json, Value};
use tempfile::TempDir;
use tower::util::ServiceExt;
struct TestApp {
app: axum::Router,
_install_dir: TempDir,
}
impl TestApp {
fn with_setup<F>(setup: F) -> Self
where
F: FnOnce(&Path),
{
let install_dir = tempfile::tempdir().expect("create temp install dir");
setup(install_dir.path());
let manager = AgentManager::new(install_dir.path()).expect("create agent manager");
let state = AppState::new(AuthConfig::disabled(), manager);
let app = build_router(state);
Self {
app,
_install_dir: install_dir,
}
}
}
#[path = "support/docker.rs"]
mod docker_support;
use docker_support::TestApp;
fn write_executable(path: &Path, script: &str) {
fs::write(path, script).expect("write executable");
@ -101,28 +78,29 @@ fn setup_stub_agent_process_only(install_dir: &Path, agent: &str) {
}
async fn send_request(
app: &axum::Router,
app: &docker_support::DockerApp,
method: Method,
uri: &str,
body: Option<Value>,
) -> (StatusCode, Vec<u8>) {
let mut builder = Request::builder().method(method).uri(uri);
let request_body = if let Some(body) = body {
builder = builder.header("content-type", "application/json");
Body::from(body.to_string())
let client = reqwest::Client::new();
let response = if let Some(body) = body {
client
.request(method, app.http_url(uri))
.header("content-type", "application/json")
.body(body.to_string())
.send()
.await
.expect("request handled")
} else {
Body::empty()
client
.request(method, app.http_url(uri))
.send()
.await
.expect("request handled")
};
let request = builder.body(request_body).expect("build request");
let response = app.clone().oneshot(request).await.expect("request handled");
let status = response.status();
let bytes = response
.into_body()
.collect()
.await
.expect("collect body")
.to_bytes();
let bytes = response.bytes().await.expect("collect body");
(status, bytes.to_vec())
}
@ -145,7 +123,7 @@ async fn agent_process_matrix_smoke_and_jsonrpc_conformance() {
.chain(agent_process_only_agents.iter())
.copied()
.collect();
let test_app = TestApp::with_setup(|install_dir| {
let test_app = TestApp::with_setup(AuthConfig::disabled(), |install_dir| {
for agent in native_agents {
setup_stub_artifacts(install_dir, agent);
}
@ -201,21 +179,15 @@ async fn agent_process_matrix_smoke_and_jsonrpc_conformance() {
assert_eq!(new_json["id"], 2, "{agent}: session/new id");
assert_eq!(new_json["result"]["echoedMethod"], "session/new");
let request = Request::builder()
.method(Method::GET)
.uri(format!("/v1/acp/{agent}-server"))
.body(Body::empty())
.expect("build sse request");
let response = test_app
.app
.clone()
.oneshot(request)
let response = reqwest::Client::new()
.get(test_app.app.http_url(&format!("/v1/acp/{agent}-server")))
.header("accept", "text/event-stream")
.send()
.await
.expect("sse response");
assert_eq!(response.status(), StatusCode::OK);
let mut stream = response.into_body().into_data_stream();
let mut stream = response.bytes_stream();
let chunk = tokio::time::timeout(std::time::Duration::from_secs(5), async move {
while let Some(item) = stream.next().await {
let bytes = item.expect("sse chunk");

View file

@ -1,128 +1,19 @@
use std::fs;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::time::Duration;
use axum::body::Body;
use axum::http::{header, HeaderMap, Method, Request, StatusCode};
use axum::Router;
use futures::StreamExt;
use http_body_util::BodyExt;
use sandbox_agent::router::{build_router, AppState, AuthConfig};
use sandbox_agent_agent_management::agents::AgentManager;
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};
use reqwest::{Method, StatusCode};
use sandbox_agent::router::AuthConfig;
use serde_json::{json, Value};
use serial_test::serial;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tower::util::ServiceExt;
struct TestApp {
app: Router,
install_dir: TempDir,
}
impl TestApp {
fn new(auth: AuthConfig) -> Self {
Self::with_setup(auth, |_| {})
}
fn with_setup<F>(auth: AuthConfig, setup: F) -> Self
where
F: FnOnce(&Path),
{
let install_dir = tempfile::tempdir().expect("create temp install dir");
setup(install_dir.path());
let manager = AgentManager::new(install_dir.path()).expect("create agent manager");
let state = AppState::new(auth, manager);
let app = build_router(state);
Self { app, install_dir }
}
fn install_path(&self) -> &Path {
self.install_dir.path()
}
}
struct EnvVarGuard {
key: &'static str,
previous: Option<std::ffi::OsString>,
}
struct LiveServer {
address: SocketAddr,
shutdown_tx: Option<oneshot::Sender<()>>,
task: JoinHandle<()>,
}
impl LiveServer {
async fn spawn(app: Router) -> Self {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind live server");
let address = listener.local_addr().expect("live server address");
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let task = tokio::spawn(async move {
let server =
axum::serve(listener, app.into_make_service()).with_graceful_shutdown(async {
let _ = shutdown_rx.await;
});
let _ = server.await;
});
Self {
address,
shutdown_tx: Some(shutdown_tx),
task,
}
}
fn http_url(&self, path: &str) -> String {
format!("http://{}{}", self.address, path)
}
fn ws_url(&self, path: &str) -> String {
format!("ws://{}{}", self.address, path)
}
async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
let _ = tokio::time::timeout(Duration::from_secs(3), async {
let _ = self.task.await;
})
.await;
}
}
impl EnvVarGuard {
fn set(key: &'static str, value: &str) -> Self {
let previous = std::env::var_os(key);
std::env::set_var(key, value);
Self { key, previous }
}
fn set_os(key: &'static str, value: &std::ffi::OsStr) -> Self {
let previous = std::env::var_os(key);
std::env::set_var(key, value);
Self { key, previous }
}
}
impl Drop for EnvVarGuard {
fn drop(&mut self) {
if let Some(previous) = self.previous.as_ref() {
std::env::set_var(self.key, previous);
} else {
std::env::remove_var(self.key);
}
}
}
#[path = "support/docker.rs"]
mod docker_support;
use docker_support::{LiveServer, TestApp};
fn write_executable(path: &Path, script: &str) {
fs::write(path, script).expect("write executable");
@ -168,17 +59,18 @@ exit 0
}
fn serve_registry_once(document: Value) -> String {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind registry server");
let address = listener.local_addr().expect("registry address");
let listener = TcpListener::bind("0.0.0.0:0").expect("bind registry server");
let port = listener.local_addr().expect("registry address").port();
let body = document.to_string();
std::thread::spawn(move || {
if let Ok((mut stream, _)) = listener.accept() {
respond_json(&mut stream, &body);
std::thread::spawn(move || loop {
match listener.accept() {
Ok((mut stream, _)) => respond_json(&mut stream, &body),
Err(_) => break,
}
});
format!("http://{address}/registry.json")
format!("http://127.0.0.1:{port}/registry.json")
}
fn respond_json(stream: &mut TcpStream, body: &str) {
@ -196,74 +88,96 @@ fn respond_json(stream: &mut TcpStream, body: &str) {
}
async fn send_request(
app: &Router,
app: &docker_support::DockerApp,
method: Method,
uri: &str,
body: Option<Value>,
headers: &[(&str, &str)],
) -> (StatusCode, HeaderMap, Vec<u8>) {
let mut builder = Request::builder().method(method).uri(uri);
let client = reqwest::Client::new();
let mut builder = client.request(method, app.http_url(uri));
for (name, value) in headers {
builder = builder.header(*name, *value);
let header_name = HeaderName::from_bytes(name.as_bytes()).expect("header name");
let header_value = HeaderValue::from_str(value).expect("header value");
builder = builder.header(header_name, header_value);
}
let request_body = if let Some(body) = body {
builder = builder.header(header::CONTENT_TYPE, "application/json");
Body::from(body.to_string())
let response = if let Some(body) = body {
builder
.header(header::CONTENT_TYPE, "application/json")
.body(body.to_string())
.send()
.await
.expect("request handled")
} else {
Body::empty()
builder.send().await.expect("request handled")
};
let request = builder.body(request_body).expect("build request");
let response = app.clone().oneshot(request).await.expect("request handled");
let status = response.status();
let headers = response.headers().clone();
let bytes = response
.into_body()
.collect()
.await
.expect("collect body")
.to_bytes();
let bytes = response.bytes().await.expect("collect body");
(status, headers, bytes.to_vec())
}
async fn send_request_raw(
app: &Router,
app: &docker_support::DockerApp,
method: Method,
uri: &str,
body: Option<Vec<u8>>,
headers: &[(&str, &str)],
content_type: Option<&str>,
) -> (StatusCode, HeaderMap, Vec<u8>) {
let mut builder = Request::builder().method(method).uri(uri);
let client = reqwest::Client::new();
let mut builder = client.request(method, app.http_url(uri));
for (name, value) in headers {
builder = builder.header(*name, *value);
let header_name = HeaderName::from_bytes(name.as_bytes()).expect("header name");
let header_value = HeaderValue::from_str(value).expect("header value");
builder = builder.header(header_name, header_value);
}
let request_body = if let Some(body) = body {
let response = if let Some(body) = body {
if let Some(content_type) = content_type {
builder = builder.header(header::CONTENT_TYPE, content_type);
}
Body::from(body)
builder.body(body).send().await.expect("request handled")
} else {
Body::empty()
builder.send().await.expect("request handled")
};
let request = builder.body(request_body).expect("build request");
let response = app.clone().oneshot(request).await.expect("request handled");
let status = response.status();
let headers = response.headers().clone();
let bytes = response
.into_body()
.collect()
.await
.expect("collect body")
.to_bytes();
let bytes = response.bytes().await.expect("collect body");
(status, headers, bytes.to_vec())
}
async fn launch_desktop_focus_window(app: &docker_support::DockerApp, display: &str) {
let command = r#"nohup xterm -geometry 80x24+40+40 -title 'Sandbox Desktop Test' -e sh -lc 'sleep 60' >/tmp/sandbox-agent-xterm.log 2>&1 < /dev/null & for _ in $(seq 1 50); do wid="$(xdotool search --onlyvisible --name 'Sandbox Desktop Test' 2>/dev/null | head -n 1 || true)"; if [ -n "$wid" ]; then xdotool windowactivate "$wid"; exit 0; fi; sleep 0.1; done; exit 1"#;
let (status, _, body) = send_request(
app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", command],
"env": {
"DISPLAY": display,
},
"timeoutMs": 10_000
})),
&[],
)
.await;
assert_eq!(
status,
StatusCode::OK,
"unexpected desktop focus window launch response: {}",
String::from_utf8_lossy(&body)
);
let parsed = parse_json(&body);
assert_eq!(parsed["exitCode"], 0);
}
fn parse_json(bytes: &[u8]) -> Value {
if bytes.is_empty() {
Value::Null
@ -284,7 +198,7 @@ fn initialize_payload() -> Value {
})
}
async fn bootstrap_server(app: &Router, server_id: &str, agent: &str) {
async fn bootstrap_server(app: &docker_support::DockerApp, server_id: &str, agent: &str) {
let initialize = initialize_payload();
let (status, _, _body) = send_request(
app,
@ -297,17 +211,17 @@ async fn bootstrap_server(app: &Router, server_id: &str, agent: &str) {
assert_eq!(status, StatusCode::OK);
}
async fn read_first_sse_data(app: &Router, server_id: &str) -> String {
let request = Request::builder()
.method(Method::GET)
.uri(format!("/v1/acp/{server_id}"))
.body(Body::empty())
.expect("build request");
let response = app.clone().oneshot(request).await.expect("sse response");
async fn read_first_sse_data(app: &docker_support::DockerApp, server_id: &str) -> String {
let client = reqwest::Client::new();
let response = client
.get(app.http_url(&format!("/v1/acp/{server_id}")))
.header("accept", "text/event-stream")
.send()
.await
.expect("sse response");
assert_eq!(response.status(), StatusCode::OK);
let mut stream = response.into_body().into_data_stream();
let mut stream = response.bytes_stream();
tokio::time::timeout(Duration::from_secs(5), async move {
while let Some(chunk) = stream.next().await {
let bytes = chunk.expect("stream chunk");
@ -323,21 +237,21 @@ async fn read_first_sse_data(app: &Router, server_id: &str) -> String {
}
async fn read_first_sse_data_with_last_id(
app: &Router,
app: &docker_support::DockerApp,
server_id: &str,
last_event_id: u64,
) -> String {
let request = Request::builder()
.method(Method::GET)
.uri(format!("/v1/acp/{server_id}"))
let client = reqwest::Client::new();
let response = client
.get(app.http_url(&format!("/v1/acp/{server_id}")))
.header("accept", "text/event-stream")
.header("last-event-id", last_event_id.to_string())
.body(Body::empty())
.expect("build request");
let response = app.clone().oneshot(request).await.expect("sse response");
.send()
.await
.expect("sse response");
assert_eq!(response.status(), StatusCode::OK);
let mut stream = response.into_body().into_data_stream();
let mut stream = response.bytes_stream();
tokio::time::timeout(Duration::from_secs(5), async move {
while let Some(chunk) = stream.next().await {
let bytes = chunk.expect("stream chunk");
@ -375,5 +289,7 @@ mod acp_transport;
mod config_endpoints;
#[path = "v1_api/control_plane.rs"]
mod control_plane;
#[path = "v1_api/desktop.rs"]
mod desktop;
#[path = "v1_api/processes.rs"]
mod processes;

View file

@ -22,8 +22,9 @@ async fn mcp_config_requires_directory_and_name() {
#[tokio::test]
async fn mcp_config_crud_round_trip() {
let test_app = TestApp::new(AuthConfig::disabled());
let project = tempfile::tempdir().expect("tempdir");
let directory = project.path().to_string_lossy().to_string();
let project = test_app.root_path().join("mcp-config-project");
fs::create_dir_all(&project).expect("create project dir");
let directory = project.to_string_lossy().to_string();
let entry = json!({
"type": "local",
@ -99,8 +100,9 @@ async fn skills_config_requires_directory_and_name() {
#[tokio::test]
async fn skills_config_crud_round_trip() {
let test_app = TestApp::new(AuthConfig::disabled());
let project = tempfile::tempdir().expect("tempdir");
let directory = project.path().to_string_lossy().to_string();
let project = test_app.root_path().join("skills-config-project");
fs::create_dir_all(&project).expect("create project dir");
let directory = project.to_string_lossy().to_string();
let entry = json!({
"sources": [

View file

@ -1,4 +1,5 @@
use super::*;
use std::collections::BTreeMap;
#[tokio::test]
async fn v1_health_removed_legacy_and_opencode_unmounted() {
@ -137,10 +138,19 @@ async fn v1_filesystem_endpoints_round_trip() {
#[tokio::test]
#[serial]
async fn require_preinstall_blocks_missing_agent() {
let test_app = {
let _preinstall = EnvVarGuard::set("SANDBOX_AGENT_REQUIRE_PREINSTALL", "true");
TestApp::new(AuthConfig::disabled())
};
let mut env = BTreeMap::new();
env.insert(
"SANDBOX_AGENT_REQUIRE_PREINSTALL".to_string(),
"true".to_string(),
);
let test_app = TestApp::with_options(
AuthConfig::disabled(),
docker_support::TestAppOptions {
env,
..Default::default()
},
|_| {},
);
let (status, _, body) = send_request(
&test_app.app,
@ -176,20 +186,26 @@ async fn lazy_install_runs_on_first_bootstrap() {
]
}));
let _registry = EnvVarGuard::set("SANDBOX_AGENT_ACP_REGISTRY_URL", &registry_url);
let test_app = TestApp::with_setup(AuthConfig::disabled(), |install_path| {
fs::create_dir_all(install_path.join("agent_processes"))
.expect("create agent processes dir");
write_executable(&install_path.join("codex"), "#!/usr/bin/env sh\nexit 0\n");
fs::create_dir_all(install_path.join("bin")).expect("create bin dir");
write_fake_npm(&install_path.join("bin").join("npm"));
});
let helper_bin_root = tempfile::tempdir().expect("helper bin tempdir");
let helper_bin = helper_bin_root.path().join("bin");
fs::create_dir_all(&helper_bin).expect("create helper bin dir");
write_fake_npm(&helper_bin.join("npm"));
let original_path = std::env::var_os("PATH").unwrap_or_default();
let mut paths = vec![test_app.install_path().join("bin")];
paths.extend(std::env::split_paths(&original_path));
let merged_path = std::env::join_paths(paths).expect("join PATH");
let _path_guard = EnvVarGuard::set_os("PATH", merged_path.as_os_str());
let mut env = BTreeMap::new();
env.insert("SANDBOX_AGENT_ACP_REGISTRY_URL".to_string(), registry_url);
let test_app = TestApp::with_options(
AuthConfig::disabled(),
docker_support::TestAppOptions {
env,
extra_paths: vec![helper_bin.clone()],
..Default::default()
},
|install_path| {
fs::create_dir_all(install_path.join("agent_processes"))
.expect("create agent processes dir");
write_executable(&install_path.join("codex"), "#!/usr/bin/env sh\nexit 0\n");
},
);
let (status, _, _) = send_request(
&test_app.app,

View file

@ -0,0 +1,494 @@
use super::*;
use futures::{SinkExt, StreamExt};
use serial_test::serial;
use std::collections::BTreeMap;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
fn png_dimensions(bytes: &[u8]) -> (u32, u32) {
assert!(bytes.starts_with(b"\x89PNG\r\n\x1a\n"));
let width = u32::from_be_bytes(bytes[16..20].try_into().expect("png width bytes"));
let height = u32::from_be_bytes(bytes[20..24].try_into().expect("png height bytes"));
(width, height)
}
async fn recv_ws_message(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
) -> Message {
tokio::time::timeout(Duration::from_secs(5), ws.next())
.await
.expect("timed out waiting for websocket frame")
.expect("websocket stream ended")
.expect("websocket frame")
}
#[tokio::test]
#[serial]
async fn v1_desktop_status_reports_install_required_when_dependencies_are_missing() {
let temp = tempfile::tempdir().expect("create empty path tempdir");
let mut env = BTreeMap::new();
env.insert(
"PATH".to_string(),
temp.path().to_string_lossy().to_string(),
);
let test_app = TestApp::with_options(
AuthConfig::disabled(),
docker_support::TestAppOptions {
env,
replace_path: true,
..Default::default()
},
|_| {},
);
let (status, _, body) =
send_request(&test_app.app, Method::GET, "/v1/desktop/status", None, &[]).await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["state"], "install_required");
assert!(parsed["missingDependencies"]
.as_array()
.expect("missingDependencies array")
.iter()
.any(|value| value == "Xvfb"));
assert_eq!(
parsed["installCommand"],
"sandbox-agent install desktop --yes"
);
}
#[tokio::test]
#[serial]
async fn v1_desktop_lifecycle_and_actions_work_with_real_runtime() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/start",
Some(json!({
"width": 1440,
"height": 900,
"dpi": 96
})),
&[],
)
.await;
assert_eq!(
status,
StatusCode::OK,
"unexpected start response: {}",
String::from_utf8_lossy(&body)
);
let parsed = parse_json(&body);
assert_eq!(parsed["state"], "active");
let display = parsed["display"]
.as_str()
.expect("desktop display")
.to_string();
assert!(display.starts_with(':'));
assert_eq!(parsed["resolution"]["width"], 1440);
assert_eq!(parsed["resolution"]["height"], 900);
let (status, headers, body) = send_request_raw(
&test_app.app,
Method::GET,
"/v1/desktop/screenshot",
None,
&[],
None,
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(
headers
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some("image/png")
);
assert!(body.starts_with(b"\x89PNG\r\n\x1a\n"));
assert_eq!(png_dimensions(&body), (1440, 900));
let (status, headers, body) = send_request_raw(
&test_app.app,
Method::GET,
"/v1/desktop/screenshot?format=jpeg&quality=50",
None,
&[],
None,
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(
headers
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some("image/jpeg")
);
assert!(body.starts_with(&[0xff, 0xd8, 0xff]));
let (status, headers, body) = send_request_raw(
&test_app.app,
Method::GET,
"/v1/desktop/screenshot?scale=0.5",
None,
&[],
None,
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(
headers
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some("image/png")
);
assert_eq!(png_dimensions(&body), (720, 450));
let (status, _, body) = send_request_raw(
&test_app.app,
Method::GET,
"/v1/desktop/screenshot/region?x=10&y=20&width=30&height=40",
None,
&[],
None,
)
.await;
assert_eq!(status, StatusCode::OK);
assert!(body.starts_with(b"\x89PNG\r\n\x1a\n"));
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/desktop/display/info",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let display_info = parse_json(&body);
assert_eq!(display_info["display"], display);
assert_eq!(display_info["resolution"]["width"], 1440);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/move",
Some(json!({ "x": 400, "y": 300 })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let mouse = parse_json(&body);
assert_eq!(mouse["x"], 400);
assert_eq!(mouse["y"], 300);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/drag",
Some(json!({
"startX": 100,
"startY": 110,
"endX": 220,
"endY": 230,
"button": "left"
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let dragged = parse_json(&body);
assert_eq!(dragged["x"], 220);
assert_eq!(dragged["y"], 230);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/click",
Some(json!({
"x": 220,
"y": 230,
"button": "left",
"clickCount": 1
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let clicked = parse_json(&body);
assert_eq!(clicked["x"], 220);
assert_eq!(clicked["y"], 230);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/down",
Some(json!({
"x": 220,
"y": 230,
"button": "left"
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let mouse_down = parse_json(&body);
assert_eq!(mouse_down["x"], 220);
assert_eq!(mouse_down["y"], 230);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/move",
Some(json!({ "x": 260, "y": 280 })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let moved_while_down = parse_json(&body);
assert_eq!(moved_while_down["x"], 260);
assert_eq!(moved_while_down["y"], 280);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/up",
Some(json!({ "button": "left" })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let mouse_up = parse_json(&body);
assert_eq!(mouse_up["x"], 260);
assert_eq!(mouse_up["y"], 280);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/scroll",
Some(json!({
"x": 220,
"y": 230,
"deltaY": -3
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let scrolled = parse_json(&body);
assert_eq!(scrolled["x"], 220);
assert_eq!(scrolled["y"], 230);
let (status, _, body) =
send_request(&test_app.app, Method::GET, "/v1/desktop/windows", None, &[]).await;
assert_eq!(status, StatusCode::OK);
assert!(parse_json(&body)["windows"].is_array());
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/desktop/mouse/position",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let position = parse_json(&body);
assert_eq!(position["x"], 220);
assert_eq!(position["y"], 230);
launch_desktop_focus_window(&test_app.app, &display).await;
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/keyboard/type",
Some(json!({ "text": "hello world", "delayMs": 5 })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["ok"], true);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/keyboard/press",
Some(json!({ "key": "ctrl+l" })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["ok"], true);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/keyboard/press",
Some(json!({
"key": "l",
"modifiers": {
"ctrl": true
}
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["ok"], true);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/keyboard/down",
Some(json!({ "key": "shift" })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["ok"], true);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/keyboard/up",
Some(json!({ "key": "shift" })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["ok"], true);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/recording/start",
Some(json!({ "fps": 8 })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let recording = parse_json(&body);
let recording_id = recording["id"].as_str().expect("recording id").to_string();
assert_eq!(recording["status"], "recording");
tokio::time::sleep(Duration::from_secs(2)).await;
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/recording/stop",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let stopped_recording = parse_json(&body);
assert_eq!(stopped_recording["id"], recording_id);
assert_eq!(stopped_recording["status"], "completed");
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/desktop/recordings",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert!(parse_json(&body)["recordings"].is_array());
let (status, headers, body) = send_request_raw(
&test_app.app,
Method::GET,
&format!("/v1/desktop/recordings/{recording_id}/download"),
None,
&[],
None,
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(
headers
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some("video/mp4")
);
assert!(body.windows(4).any(|window| window == b"ftyp"));
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/stream/start",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["active"], true);
let (mut ws, _) = connect_async(test_app.app.ws_url("/v1/desktop/stream/ws"))
.await
.expect("connect desktop stream websocket");
let ready = recv_ws_message(&mut ws).await;
match ready {
Message::Text(text) => {
let value: Value = serde_json::from_str(&text).expect("desktop stream ready frame");
assert_eq!(value["type"], "ready");
assert_eq!(value["width"], 1440);
assert_eq!(value["height"], 900);
}
other => panic!("expected text ready frame, got {other:?}"),
}
let frame = recv_ws_message(&mut ws).await;
match frame {
Message::Binary(bytes) => assert!(bytes.starts_with(&[0xff, 0xd8, 0xff])),
other => panic!("expected binary jpeg frame, got {other:?}"),
}
ws.send(Message::Text(
json!({
"type": "moveMouse",
"x": 320,
"y": 330
})
.to_string()
.into(),
))
.await
.expect("send desktop stream mouse move");
let _ = ws.close(None).await;
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/stream/stop",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["active"], false);
let (status, _, _) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/desktop/recordings/{recording_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT);
let (status, _, body) =
send_request(&test_app.app, Method::POST, "/v1/desktop/stop", None, &[]).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["state"], "inactive");
}

View file

@ -2,6 +2,7 @@ use super::*;
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine;
use futures::{SinkExt, StreamExt};
use serial_test::serial;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
@ -277,6 +278,98 @@ async fn v1_process_tty_input_and_logs() {
assert_eq!(status, StatusCode::NO_CONTENT);
}
#[tokio::test]
#[serial]
async fn v1_processes_owner_filter_separates_user_and_desktop_processes() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "sleep 30"],
"tty": false,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let user_process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/start",
Some(json!({
"width": 1024,
"height": 768
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["state"], "active");
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes?owner=user",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let user_processes = parse_json(&body)["processes"]
.as_array()
.cloned()
.unwrap_or_default();
assert!(user_processes
.iter()
.any(|process| process["id"] == user_process_id));
assert!(user_processes
.iter()
.all(|process| process["owner"] == "user"));
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes?owner=desktop",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let desktop_processes = parse_json(&body)["processes"]
.as_array()
.cloned()
.unwrap_or_default();
assert!(desktop_processes.len() >= 2);
assert!(desktop_processes
.iter()
.all(|process| process["owner"] == "desktop"));
let (status, _, _) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{user_process_id}/kill"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, _, body) =
send_request(&test_app.app, Method::POST, "/v1/desktop/stop", None, &[]).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["state"], "inactive");
}
#[tokio::test]
async fn v1_process_not_found_returns_404() {
let test_app = TestApp::new(AuthConfig::disabled());
@ -413,22 +506,17 @@ async fn v1_process_logs_follow_sse_streams_entries() {
.expect("process id")
.to_string();
let request = Request::builder()
.method(Method::GET)
.uri(format!(
let response = reqwest::Client::new()
.get(test_app.app.http_url(&format!(
"/v1/processes/{process_id}/logs?stream=stdout&follow=true"
))
.body(Body::empty())
.expect("build request");
let response = test_app
.app
.clone()
.oneshot(request)
)))
.header("accept", "text/event-stream")
.send()
.await
.expect("sse response");
assert_eq!(response.status(), StatusCode::OK);
let mut stream = response.into_body().into_data_stream();
let mut stream = response.bytes_stream();
let chunk = tokio::time::timeout(Duration::from_secs(5), async move {
while let Some(chunk) = stream.next().await {
let bytes = chunk.expect("stream chunk");