feat: expand desktop computer-use APIs

This commit is contained in:
Nathan Flurry 2026-03-15 17:51:58 -07:00
parent 96dcc3d5f9
commit e638148345
43 changed files with 6359 additions and 493 deletions

View file

@ -105,6 +105,7 @@ fn desktop_packages(package_manager: DesktopPackageManager, no_fonts: bool) -> V
"openbox",
"xdotool",
"imagemagick",
"ffmpeg",
"x11-xserver-utils",
"dbus-x11",
"xauth",
@ -115,6 +116,7 @@ fn desktop_packages(package_manager: DesktopPackageManager, no_fonts: bool) -> V
"openbox",
"xdotool",
"ImageMagick",
"ffmpeg",
"xrandr",
"dbus-x11",
"xauth",
@ -125,6 +127,7 @@ fn desktop_packages(package_manager: DesktopPackageManager, no_fonts: bool) -> V
"openbox",
"xdotool",
"imagemagick",
"ffmpeg",
"xrandr",
"dbus",
"xauth",

View file

@ -0,0 +1,309 @@
use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::Mutex;
use sandbox_agent_error::SandboxError;
use crate::desktop_types::{
DesktopRecordingInfo, DesktopRecordingListResponse, DesktopRecordingStartRequest,
DesktopRecordingStatus, DesktopResolution,
};
use crate::process_runtime::{ProcessOwner, ProcessRuntime, ProcessStartSpec, ProcessStatus, RestartPolicy};
#[derive(Debug, Clone)]
pub struct DesktopRecordingContext {
pub display: String,
pub environment: std::collections::HashMap<String, String>,
pub resolution: DesktopResolution,
}
#[derive(Debug, Clone)]
pub struct DesktopRecordingManager {
process_runtime: Arc<ProcessRuntime>,
recordings_dir: PathBuf,
inner: Arc<Mutex<DesktopRecordingState>>,
}
#[derive(Debug, Default)]
struct DesktopRecordingState {
next_id: u64,
current_id: Option<String>,
recordings: BTreeMap<String, RecordingEntry>,
}
#[derive(Debug, Clone)]
struct RecordingEntry {
info: DesktopRecordingInfo,
path: PathBuf,
}
impl DesktopRecordingManager {
pub fn new(process_runtime: Arc<ProcessRuntime>, state_dir: PathBuf) -> Self {
Self {
process_runtime,
recordings_dir: state_dir.join("recordings"),
inner: Arc::new(Mutex::new(DesktopRecordingState::default())),
}
}
pub async fn start(
&self,
context: DesktopRecordingContext,
request: DesktopRecordingStartRequest,
) -> Result<DesktopRecordingInfo, SandboxError> {
if find_binary("ffmpeg").is_none() {
return Err(SandboxError::Conflict {
message: "ffmpeg is required for desktop recording".to_string(),
});
}
self.ensure_recordings_dir()?;
{
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
if state.current_id.is_some() {
return Err(SandboxError::Conflict {
message: "a desktop recording is already active".to_string(),
});
}
}
let mut state = self.inner.lock().await;
let id_num = state.next_id + 1;
state.next_id = id_num;
let id = format!("rec_{id_num}");
let file_name = format!("{id}.mp4");
let path = self.recordings_dir.join(&file_name);
let fps = request.fps.unwrap_or(30).clamp(1, 60);
let args = vec![
"-y".to_string(),
"-video_size".to_string(),
format!("{}x{}", context.resolution.width, context.resolution.height),
"-framerate".to_string(),
fps.to_string(),
"-f".to_string(),
"x11grab".to_string(),
"-i".to_string(),
context.display,
"-c:v".to_string(),
"libx264".to_string(),
"-preset".to_string(),
"ultrafast".to_string(),
"-pix_fmt".to_string(),
"yuv420p".to_string(),
path.to_string_lossy().to_string(),
];
let snapshot = self
.process_runtime
.start_process(ProcessStartSpec {
command: "ffmpeg".to_string(),
args,
cwd: None,
env: context.environment,
tty: false,
interactive: false,
owner: ProcessOwner::Desktop,
restart_policy: Some(RestartPolicy::Never),
})
.await?;
let info = DesktopRecordingInfo {
id: id.clone(),
status: DesktopRecordingStatus::Recording,
process_id: Some(snapshot.id),
file_name,
bytes: 0,
started_at: chrono::Utc::now().to_rfc3339(),
ended_at: None,
};
state.current_id = Some(id.clone());
state.recordings.insert(
id,
RecordingEntry {
info: info.clone(),
path,
},
);
Ok(info)
}
pub async fn stop(&self) -> Result<DesktopRecordingInfo, SandboxError> {
let (recording_id, process_id) = {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
let recording_id = state.current_id.clone().ok_or_else(|| SandboxError::Conflict {
message: "no desktop recording is active".to_string(),
})?;
let process_id = state
.recordings
.get(&recording_id)
.and_then(|entry| entry.info.process_id.clone());
(recording_id, process_id)
};
if let Some(process_id) = process_id {
let snapshot = self.process_runtime.stop_process(&process_id, Some(5_000)).await?;
if snapshot.status == ProcessStatus::Running {
let _ = self.process_runtime.kill_process(&process_id, Some(1_000)).await;
}
}
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
let entry = state
.recordings
.get(&recording_id)
.ok_or_else(|| SandboxError::NotFound {
resource: "desktop_recording".to_string(),
id: recording_id.clone(),
})?;
Ok(entry.info.clone())
}
pub async fn list(&self) -> Result<DesktopRecordingListResponse, SandboxError> {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
Ok(DesktopRecordingListResponse {
recordings: state.recordings.values().map(|entry| entry.info.clone()).collect(),
})
}
pub async fn get(&self, id: &str) -> Result<DesktopRecordingInfo, SandboxError> {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
state
.recordings
.get(id)
.map(|entry| entry.info.clone())
.ok_or_else(|| SandboxError::NotFound {
resource: "desktop_recording".to_string(),
id: id.to_string(),
})
}
pub async fn download_path(&self, id: &str) -> Result<PathBuf, SandboxError> {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
let entry = state
.recordings
.get(id)
.ok_or_else(|| SandboxError::NotFound {
resource: "desktop_recording".to_string(),
id: id.to_string(),
})?;
if !entry.path.is_file() {
return Err(SandboxError::NotFound {
resource: "desktop_recording_file".to_string(),
id: id.to_string(),
});
}
Ok(entry.path.clone())
}
pub async fn delete(&self, id: &str) -> Result<(), SandboxError> {
let mut state = self.inner.lock().await;
self.refresh_locked(&mut state).await?;
if state.current_id.as_deref() == Some(id) {
return Err(SandboxError::Conflict {
message: "stop the active desktop recording before deleting it".to_string(),
});
}
let entry = state
.recordings
.remove(id)
.ok_or_else(|| SandboxError::NotFound {
resource: "desktop_recording".to_string(),
id: id.to_string(),
})?;
if entry.path.exists() {
fs::remove_file(&entry.path).map_err(|err| SandboxError::StreamError {
message: format!(
"failed to delete desktop recording {}: {err}",
entry.path.display()
),
})?;
}
Ok(())
}
fn ensure_recordings_dir(&self) -> Result<(), SandboxError> {
fs::create_dir_all(&self.recordings_dir).map_err(|err| SandboxError::StreamError {
message: format!(
"failed to create desktop recordings dir {}: {err}",
self.recordings_dir.display()
),
})
}
async fn refresh_locked(&self, state: &mut DesktopRecordingState) -> Result<(), SandboxError> {
let ids: Vec<String> = state.recordings.keys().cloned().collect();
for id in ids {
let should_clear_current = {
let Some(entry) = state.recordings.get_mut(&id) else {
continue;
};
let Some(process_id) = entry.info.process_id.clone() else {
Self::refresh_bytes(entry);
continue;
};
let snapshot = match self.process_runtime.snapshot(&process_id).await {
Ok(snapshot) => snapshot,
Err(SandboxError::NotFound { .. }) => {
Self::finalize_entry(entry, false);
continue;
}
Err(err) => return Err(err),
};
if snapshot.status == ProcessStatus::Running {
Self::refresh_bytes(entry);
false
} else {
Self::finalize_entry(entry, snapshot.exit_code == Some(0));
true
}
};
if should_clear_current && state.current_id.as_deref() == Some(id.as_str()) {
state.current_id = None;
}
}
Ok(())
}
fn refresh_bytes(entry: &mut RecordingEntry) {
entry.info.bytes = file_size(&entry.path);
}
fn finalize_entry(entry: &mut RecordingEntry, success: bool) {
let bytes = file_size(&entry.path);
entry.info.status = if success || (entry.path.is_file() && bytes > 0) {
DesktopRecordingStatus::Completed
} else {
DesktopRecordingStatus::Failed
};
entry.info.ended_at.get_or_insert_with(|| chrono::Utc::now().to_rfc3339());
entry.info.bytes = bytes;
}
}
fn find_binary(name: &str) -> Option<PathBuf> {
let path_env = std::env::var_os("PATH")?;
for path in std::env::split_paths(&path_env) {
let candidate = path.join(name);
if candidate.is_file() {
return Some(candidate);
}
}
None
}
fn file_size(path: &Path) -> u64 {
fs::metadata(path).map(|metadata| metadata.len()).unwrap_or(0)
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,47 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use sandbox_agent_error::SandboxError;
use crate::desktop_types::DesktopStreamStatusResponse;
#[derive(Debug, Clone)]
pub struct DesktopStreamingManager {
inner: Arc<Mutex<DesktopStreamingState>>,
}
#[derive(Debug, Default)]
struct DesktopStreamingState {
active: bool,
}
impl DesktopStreamingManager {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(DesktopStreamingState::default())),
}
}
pub async fn start(&self) -> DesktopStreamStatusResponse {
let mut state = self.inner.lock().await;
state.active = true;
DesktopStreamStatusResponse { active: true }
}
pub async fn stop(&self) -> DesktopStreamStatusResponse {
let mut state = self.inner.lock().await;
state.active = false;
DesktopStreamStatusResponse { active: false }
}
pub async fn ensure_active(&self) -> Result<(), SandboxError> {
if self.inner.lock().await.active {
Ok(())
} else {
Err(SandboxError::Conflict {
message: "desktop streaming is not active".to_string(),
})
}
}
}

View file

@ -1,6 +1,6 @@
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use utoipa::{IntoParams, ToSchema};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
@ -62,7 +62,7 @@ pub struct DesktopStatusResponse {
pub runtime_log_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, IntoParams, Default)]
#[serde(rename_all = "camelCase")]
pub struct DesktopStartRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
@ -73,17 +73,38 @@ pub struct DesktopStartRequest {
pub dpi: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, IntoParams, Default)]
#[serde(rename_all = "camelCase")]
pub struct DesktopScreenshotQuery {}
pub struct DesktopScreenshotQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub format: Option<DesktopScreenshotFormat>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quality: Option<u8>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub scale: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum DesktopScreenshotFormat {
Png,
Jpeg,
Webp,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, IntoParams)]
#[serde(rename_all = "camelCase")]
pub struct DesktopRegionScreenshotQuery {
pub x: i32,
pub y: i32,
pub width: u32,
pub height: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub format: Option<DesktopScreenshotFormat>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub quality: Option<u8>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub scale: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
@ -123,6 +144,28 @@ pub struct DesktopMouseClickRequest {
pub click_count: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMouseDownRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub x: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub y: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub button: Option<DesktopMouseButton>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMouseUpRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub x: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub y: Option<i32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub button: Option<DesktopMouseButton>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopMouseDragRequest {
@ -157,6 +200,33 @@ pub struct DesktopKeyboardTypeRequest {
#[serde(rename_all = "camelCase")]
pub struct DesktopKeyboardPressRequest {
pub key: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub modifiers: Option<DesktopKeyModifiers>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")]
pub struct DesktopKeyModifiers {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ctrl: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub shift: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alt: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cmd: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopKeyboardDownRequest {
pub key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct DesktopKeyboardUpRequest {
pub key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
@ -171,3 +241,62 @@ pub struct DesktopDisplayInfoResponse {
pub display: String,
pub resolution: DesktopResolution,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopWindowInfo {
pub id: String,
pub title: String,
pub x: i32,
pub y: i32,
pub width: u32,
pub height: u32,
pub is_active: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopWindowListResponse {
pub windows: Vec<DesktopWindowInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, Default)]
#[serde(rename_all = "camelCase")]
pub struct DesktopRecordingStartRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub fps: Option<u32>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum DesktopRecordingStatus {
Recording,
Completed,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopRecordingInfo {
pub id: String,
pub status: DesktopRecordingStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub process_id: Option<String>,
pub file_name: String,
pub bytes: u64,
pub started_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ended_at: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopRecordingListResponse {
pub recordings: Vec<DesktopRecordingInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct DesktopStreamStatusResponse {
pub active: bool,
}

View file

@ -5,7 +5,9 @@ pub mod cli;
pub mod daemon;
mod desktop_errors;
mod desktop_install;
mod desktop_recording;
mod desktop_runtime;
mod desktop_streaming;
pub mod desktop_types;
mod process_runtime;
pub mod router;

View file

@ -1,5 +1,5 @@
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
@ -27,6 +27,22 @@ pub enum ProcessStream {
Pty,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessOwner {
User,
Desktop,
System,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum RestartPolicy {
Never,
Always,
OnFailure,
}
#[derive(Debug, Clone)]
pub struct ProcessStartSpec {
pub command: String,
@ -35,6 +51,8 @@ pub struct ProcessStartSpec {
pub env: HashMap<String, String>,
pub tty: bool,
pub interactive: bool,
pub owner: ProcessOwner,
pub restart_policy: Option<RestartPolicy>,
}
#[derive(Debug, Clone)]
@ -78,6 +96,7 @@ pub struct ProcessSnapshot {
pub cwd: Option<String>,
pub tty: bool,
pub interactive: bool,
pub owner: ProcessOwner,
pub status: ProcessStatus,
pub pid: Option<u32>,
pub exit_code: Option<i32>,
@ -129,17 +148,27 @@ struct ManagedProcess {
cwd: Option<String>,
tty: bool,
interactive: bool,
owner: ProcessOwner,
#[allow(dead_code)]
restart_policy: RestartPolicy,
spec: ProcessStartSpec,
created_at_ms: i64,
pid: Option<u32>,
max_log_bytes: usize,
stdin: Mutex<Option<ProcessStdin>>,
#[cfg(unix)]
pty_resize_fd: Mutex<Option<std::fs::File>>,
runtime: Mutex<ManagedRuntime>,
status: RwLock<ManagedStatus>,
sequence: AtomicU64,
logs: Mutex<VecDeque<StoredLog>>,
total_log_bytes: Mutex<usize>,
log_tx: broadcast::Sender<ProcessLogLine>,
stop_requested: AtomicBool,
}
#[derive(Debug)]
struct ManagedRuntime {
pid: Option<u32>,
stdin: Option<ProcessStdin>,
#[cfg(unix)]
pty_resize_fd: Option<std::fs::File>,
}
#[derive(Debug)]
@ -162,17 +191,17 @@ struct ManagedStatus {
}
struct SpawnedPipeProcess {
process: Arc<ManagedProcess>,
child: Child,
stdout: tokio::process::ChildStdout,
stderr: tokio::process::ChildStderr,
runtime: ManagedRuntime,
}
#[cfg(unix)]
struct SpawnedTtyProcess {
process: Arc<ManagedProcess>,
child: Child,
reader: tokio::fs::File,
runtime: ManagedRuntime,
}
impl ProcessRuntime {
@ -224,21 +253,14 @@ impl ProcessRuntime {
&self,
spec: ProcessStartSpec,
) -> Result<ProcessSnapshot, SandboxError> {
let config = self.get_config().await;
let process_refs = {
let processes = self.inner.processes.read().await;
processes.values().cloned().collect::<Vec<_>>()
};
let mut running_count = 0usize;
for process in process_refs {
if process.status.read().await.status == ProcessStatus::Running {
running_count += 1;
}
if spec.command.trim().is_empty() {
return Err(SandboxError::InvalidRequest {
message: "command must not be empty".to_string(),
});
}
if running_count >= config.max_concurrent_processes {
let config = self.get_config().await;
if self.running_process_count().await >= config.max_concurrent_processes {
return Err(SandboxError::Conflict {
message: format!(
"max concurrent process limit reached ({})",
@ -247,73 +269,40 @@ impl ProcessRuntime {
});
}
if spec.command.trim().is_empty() {
return Err(SandboxError::InvalidRequest {
message: "command must not be empty".to_string(),
});
}
let id_num = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
let id = format!("proc_{id_num}");
if spec.tty {
#[cfg(unix)]
{
let spawned = self
.spawn_tty_process(id.clone(), spec, config.max_log_bytes_per_process)
.await?;
let process = spawned.process.clone();
self.inner
.processes
.write()
.await
.insert(id, process.clone());
let p = process.clone();
tokio::spawn(async move {
pump_output(p, spawned.reader, ProcessStream::Pty).await;
});
let p = process.clone();
tokio::spawn(async move {
watch_exit(p, spawned.child).await;
});
return Ok(process.snapshot().await);
}
#[cfg(not(unix))]
{
return Err(SandboxError::StreamError {
message: "tty process mode is not supported on this platform".to_string(),
});
}
}
let spawned = self
.spawn_pipe_process(id.clone(), spec, config.max_log_bytes_per_process)
.await?;
let process = spawned.process.clone();
self.inner
.processes
.write()
.await
.insert(id, process.clone());
let p = process.clone();
tokio::spawn(async move {
pump_output(p, spawned.stdout, ProcessStream::Stdout).await;
});
let p = process.clone();
tokio::spawn(async move {
pump_output(p, spawned.stderr, ProcessStream::Stderr).await;
});
let p = process.clone();
tokio::spawn(async move {
watch_exit(p, spawned.child).await;
let process = Arc::new(ManagedProcess {
id: id.clone(),
command: spec.command.clone(),
args: spec.args.clone(),
cwd: spec.cwd.clone(),
tty: spec.tty,
interactive: spec.interactive,
owner: spec.owner,
restart_policy: spec.restart_policy.unwrap_or(RestartPolicy::Never),
spec,
created_at_ms: now_ms(),
max_log_bytes: config.max_log_bytes_per_process,
runtime: Mutex::new(ManagedRuntime {
pid: None,
stdin: None,
#[cfg(unix)]
pty_resize_fd: None,
}),
status: RwLock::new(ManagedStatus {
status: ProcessStatus::Running,
exit_code: None,
exited_at_ms: None,
}),
sequence: AtomicU64::new(1),
logs: Mutex::new(VecDeque::new()),
total_log_bytes: Mutex::new(0),
log_tx: broadcast::channel(512).0,
stop_requested: AtomicBool::new(false),
});
self.spawn_existing_process(process.clone()).await?;
self.inner.processes.write().await.insert(id, process.clone());
Ok(process.snapshot().await)
}
@ -412,11 +401,13 @@ impl ProcessRuntime {
})
}
pub async fn list_processes(&self) -> Vec<ProcessSnapshot> {
pub async fn list_processes(&self, owner: Option<ProcessOwner>) -> Vec<ProcessSnapshot> {
let processes = self.inner.processes.read().await;
let mut items = Vec::with_capacity(processes.len());
for process in processes.values() {
items.push(process.snapshot().await);
if owner.is_none_or(|expected| process.owner == expected) {
items.push(process.snapshot().await);
}
}
items.sort_by(|a, b| a.id.cmp(&b.id));
items
@ -453,6 +444,7 @@ impl ProcessRuntime {
wait_ms: Option<u64>,
) -> Result<ProcessSnapshot, SandboxError> {
let process = self.lookup_process(id).await?;
process.stop_requested.store(true, Ordering::SeqCst);
process.send_signal(SIGTERM).await?;
maybe_wait_for_exit(process.clone(), wait_ms.unwrap_or(2_000)).await;
Ok(process.snapshot().await)
@ -464,6 +456,7 @@ impl ProcessRuntime {
wait_ms: Option<u64>,
) -> Result<ProcessSnapshot, SandboxError> {
let process = self.lookup_process(id).await?;
process.stop_requested.store(true, Ordering::SeqCst);
process.send_signal(SIGKILL).await?;
maybe_wait_for_exit(process.clone(), wait_ms.unwrap_or(1_000)).await;
Ok(process.snapshot().await)
@ -506,6 +499,17 @@ impl ProcessRuntime {
Ok(process.log_tx.subscribe())
}
async fn running_process_count(&self) -> usize {
let processes = self.inner.processes.read().await;
let mut running = 0usize;
for process in processes.values() {
if process.status.read().await.status == ProcessStatus::Running {
running += 1;
}
}
running
}
async fn lookup_process(&self, id: &str) -> Result<Arc<ManagedProcess>, SandboxError> {
let process = self.inner.processes.read().await.get(id).cloned();
process.ok_or_else(|| SandboxError::NotFound {
@ -514,12 +518,81 @@ impl ProcessRuntime {
})
}
async fn spawn_pipe_process(
async fn spawn_existing_process(
&self,
id: String,
spec: ProcessStartSpec,
max_log_bytes: usize,
) -> Result<SpawnedPipeProcess, SandboxError> {
process: Arc<ManagedProcess>,
) -> Result<(), SandboxError> {
process.stop_requested.store(false, Ordering::SeqCst);
let mut runtime_guard = process.runtime.lock().await;
let mut status_guard = process.status.write().await;
if process.tty {
#[cfg(unix)]
{
let SpawnedTtyProcess {
child,
reader,
runtime,
} = self.spawn_tty_process(&process.spec)?;
*runtime_guard = runtime;
status_guard.status = ProcessStatus::Running;
status_guard.exit_code = None;
status_guard.exited_at_ms = None;
drop(status_guard);
drop(runtime_guard);
let process_for_output = process.clone();
tokio::spawn(async move {
pump_output(process_for_output, reader, ProcessStream::Pty).await;
});
let runtime = self.clone();
tokio::spawn(async move {
watch_exit(runtime, process, child).await;
});
return Ok(());
}
#[cfg(not(unix))]
{
return Err(SandboxError::StreamError {
message: "tty process mode is not supported on this platform".to_string(),
});
}
}
let SpawnedPipeProcess {
child,
stdout,
stderr,
runtime,
} = self.spawn_pipe_process(&process.spec)?;
*runtime_guard = runtime;
status_guard.status = ProcessStatus::Running;
status_guard.exit_code = None;
status_guard.exited_at_ms = None;
drop(status_guard);
drop(runtime_guard);
let process_for_stdout = process.clone();
tokio::spawn(async move {
pump_output(process_for_stdout, stdout, ProcessStream::Stdout).await;
});
let process_for_stderr = process.clone();
tokio::spawn(async move {
pump_output(process_for_stderr, stderr, ProcessStream::Stderr).await;
});
let runtime = self.clone();
tokio::spawn(async move {
watch_exit(runtime, process, child).await;
});
Ok(())
}
fn spawn_pipe_process(&self, spec: &ProcessStartSpec) -> Result<SpawnedPipeProcess, SandboxError> {
let mut cmd = Command::new(&spec.command);
cmd.args(&spec.args)
.stdin(std::process::Stdio::piped())
@ -551,35 +624,14 @@ impl ProcessRuntime {
.ok_or_else(|| SandboxError::StreamError {
message: "failed to capture stderr".to_string(),
})?;
let pid = child.id();
let (tx, _rx) = broadcast::channel(512);
let process = Arc::new(ManagedProcess {
id,
command: spec.command,
args: spec.args,
cwd: spec.cwd,
tty: false,
interactive: spec.interactive,
created_at_ms: now_ms(),
pid,
max_log_bytes,
stdin: Mutex::new(stdin.map(ProcessStdin::Pipe)),
#[cfg(unix)]
pty_resize_fd: Mutex::new(None),
status: RwLock::new(ManagedStatus {
status: ProcessStatus::Running,
exit_code: None,
exited_at_ms: None,
}),
sequence: AtomicU64::new(1),
logs: Mutex::new(VecDeque::new()),
total_log_bytes: Mutex::new(0),
log_tx: tx,
});
Ok(SpawnedPipeProcess {
process,
runtime: ManagedRuntime {
pid: child.id(),
stdin: stdin.map(ProcessStdin::Pipe),
#[cfg(unix)]
pty_resize_fd: None,
},
child,
stdout,
stderr,
@ -587,12 +639,7 @@ impl ProcessRuntime {
}
#[cfg(unix)]
async fn spawn_tty_process(
&self,
id: String,
spec: ProcessStartSpec,
max_log_bytes: usize,
) -> Result<SpawnedTtyProcess, SandboxError> {
fn spawn_tty_process(&self, spec: &ProcessStartSpec) -> Result<SpawnedTtyProcess, SandboxError> {
use std::os::fd::AsRawFd;
use std::process::Stdio;
@ -632,8 +679,8 @@ impl ProcessRuntime {
let child = cmd.spawn().map_err(|err| SandboxError::StreamError {
message: format!("failed to spawn tty process: {err}"),
})?;
let pid = child.id();
drop(slave_fd);
let master_raw = master_fd.as_raw_fd();
@ -644,32 +691,12 @@ impl ProcessRuntime {
let writer_file = tokio::fs::File::from_std(std::fs::File::from(writer_fd));
let resize_file = std::fs::File::from(resize_fd);
let (tx, _rx) = broadcast::channel(512);
let process = Arc::new(ManagedProcess {
id,
command: spec.command,
args: spec.args,
cwd: spec.cwd,
tty: true,
interactive: spec.interactive,
created_at_ms: now_ms(),
pid,
max_log_bytes,
stdin: Mutex::new(Some(ProcessStdin::Pty(writer_file))),
pty_resize_fd: Mutex::new(Some(resize_file)),
status: RwLock::new(ManagedStatus {
status: ProcessStatus::Running,
exit_code: None,
exited_at_ms: None,
}),
sequence: AtomicU64::new(1),
logs: Mutex::new(VecDeque::new()),
total_log_bytes: Mutex::new(0),
log_tx: tx,
});
Ok(SpawnedTtyProcess {
process,
runtime: ManagedRuntime {
pid,
stdin: Some(ProcessStdin::Pty(writer_file)),
pty_resize_fd: Some(resize_file),
},
child,
reader: reader_file,
})
@ -694,6 +721,7 @@ pub struct ProcessLogFilter {
impl ManagedProcess {
async fn snapshot(&self) -> ProcessSnapshot {
let status = self.status.read().await.clone();
let pid = self.runtime.lock().await.pid;
ProcessSnapshot {
id: self.id.clone(),
command: self.command.clone(),
@ -701,8 +729,9 @@ impl ManagedProcess {
cwd: self.cwd.clone(),
tty: self.tty,
interactive: self.interactive,
owner: self.owner,
status: status.status,
pid: self.pid,
pid,
exit_code: status.exit_code,
created_at_ms: self.created_at_ms,
exited_at_ms: status.exited_at_ms,
@ -752,8 +781,8 @@ impl ManagedProcess {
});
}
let mut guard = self.stdin.lock().await;
let stdin = guard.as_mut().ok_or_else(|| SandboxError::Conflict {
let mut runtime = self.runtime.lock().await;
let stdin = runtime.stdin.as_mut().ok_or_else(|| SandboxError::Conflict {
message: "process does not accept stdin".to_string(),
})?;
@ -825,7 +854,7 @@ impl ManagedProcess {
if self.status.read().await.status != ProcessStatus::Running {
return Ok(());
}
let Some(pid) = self.pid else {
let Some(pid) = self.runtime.lock().await.pid else {
return Ok(());
};
@ -840,8 +869,9 @@ impl ManagedProcess {
#[cfg(unix)]
{
use std::os::fd::AsRawFd;
let guard = self.pty_resize_fd.lock().await;
let Some(fd) = guard.as_ref() else {
let runtime = self.runtime.lock().await;
let Some(fd) = runtime.pty_resize_fd.as_ref() else {
return Err(SandboxError::Conflict {
message: "PTY resize handle unavailable".to_string(),
});
@ -857,6 +887,32 @@ impl ManagedProcess {
Ok(())
}
#[allow(dead_code)]
fn should_restart(&self, exit_code: Option<i32>) -> bool {
match self.restart_policy {
RestartPolicy::Never => false,
RestartPolicy::Always => true,
RestartPolicy::OnFailure => exit_code.unwrap_or(1) != 0,
}
}
async fn mark_exited(&self, exit_code: Option<i32>, exited_at_ms: Option<i64>) {
{
let mut status = self.status.write().await;
status.status = ProcessStatus::Exited;
status.exit_code = exit_code;
status.exited_at_ms = exited_at_ms;
}
let mut runtime = self.runtime.lock().await;
runtime.pid = None;
let _ = runtime.stdin.take();
#[cfg(unix)]
{
let _ = runtime.pty_resize_fd.take();
}
}
}
fn stream_matches(stream: ProcessStream, filter: ProcessLogFilterStream) -> bool {
@ -909,21 +965,16 @@ where
}
}
async fn watch_exit(process: Arc<ManagedProcess>, mut child: Child) {
async fn watch_exit(runtime: ProcessRuntime, process: Arc<ManagedProcess>, mut child: Child) {
let _ = runtime;
let wait = child.wait().await;
let (exit_code, exited_at_ms) = match wait {
Ok(status) => (status.code(), Some(now_ms())),
Err(_) => (None, Some(now_ms())),
};
{
let mut state = process.status.write().await;
state.status = ProcessStatus::Exited;
state.exit_code = exit_code;
state.exited_at_ms = exited_at_ms;
}
let _ = process.stdin.lock().await.take();
let _ = process.stop_requested.swap(false, Ordering::SeqCst);
process.mark_exited(exit_code, exited_at_ms).await;
}
async fn capture_output<R>(mut reader: R, max_bytes: usize) -> std::io::Result<(Vec<u8>, bool)>

View file

@ -34,15 +34,16 @@ use tar::Archive;
use tokio_stream::wrappers::BroadcastStream;
use tower_http::trace::TraceLayer;
use tracing::Span;
use utoipa::{Modify, OpenApi, ToSchema};
use utoipa::{IntoParams, Modify, OpenApi, ToSchema};
use crate::acp_proxy_runtime::{AcpProxyRuntime, ProxyPostOutcome};
use crate::desktop_errors::DesktopProblem;
use crate::desktop_runtime::DesktopRuntime;
use crate::desktop_types::*;
use crate::process_runtime::{
decode_input_bytes, ProcessLogFilter, ProcessLogFilterStream, ProcessRuntime,
ProcessRuntimeConfig, ProcessSnapshot, ProcessStartSpec, ProcessStatus, ProcessStream, RunSpec,
decode_input_bytes, ProcessLogFilter, ProcessLogFilterStream, ProcessOwner as RuntimeProcessOwner,
ProcessRuntime, ProcessRuntimeConfig, ProcessSnapshot, ProcessStartSpec, ProcessStatus,
ProcessStream, RunSpec,
};
use crate::ui;
@ -115,7 +116,7 @@ impl AppState {
},
));
let process_runtime = Arc::new(ProcessRuntime::new());
let desktop_runtime = Arc::new(DesktopRuntime::new());
let desktop_runtime = Arc::new(DesktopRuntime::new(process_runtime.clone()));
Self {
auth,
agent_manager,
@ -196,6 +197,8 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
)
.route("/desktop/mouse/move", post(post_v1_desktop_mouse_move))
.route("/desktop/mouse/click", post(post_v1_desktop_mouse_click))
.route("/desktop/mouse/down", post(post_v1_desktop_mouse_down))
.route("/desktop/mouse/up", post(post_v1_desktop_mouse_up))
.route("/desktop/mouse/drag", post(post_v1_desktop_mouse_drag))
.route("/desktop/mouse/scroll", post(post_v1_desktop_mouse_scroll))
.route(
@ -206,7 +209,33 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
"/desktop/keyboard/press",
post(post_v1_desktop_keyboard_press),
)
.route(
"/desktop/keyboard/down",
post(post_v1_desktop_keyboard_down),
)
.route("/desktop/keyboard/up", post(post_v1_desktop_keyboard_up))
.route("/desktop/display/info", get(get_v1_desktop_display_info))
.route("/desktop/windows", get(get_v1_desktop_windows))
.route(
"/desktop/recording/start",
post(post_v1_desktop_recording_start),
)
.route(
"/desktop/recording/stop",
post(post_v1_desktop_recording_stop),
)
.route("/desktop/recordings", get(get_v1_desktop_recordings))
.route(
"/desktop/recordings/:id",
get(get_v1_desktop_recording).delete(delete_v1_desktop_recording),
)
.route(
"/desktop/recordings/:id/download",
get(get_v1_desktop_recording_download),
)
.route("/desktop/stream/start", post(post_v1_desktop_stream_start))
.route("/desktop/stream/stop", post(post_v1_desktop_stream_stop))
.route("/desktop/stream/ws", get(get_v1_desktop_stream_ws))
.route("/agents", get(get_v1_agents))
.route("/agents/:agent", get(get_v1_agent))
.route("/agents/:agent/install", post(post_v1_agent_install))
@ -366,11 +395,25 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
get_v1_desktop_mouse_position,
post_v1_desktop_mouse_move,
post_v1_desktop_mouse_click,
post_v1_desktop_mouse_down,
post_v1_desktop_mouse_up,
post_v1_desktop_mouse_drag,
post_v1_desktop_mouse_scroll,
post_v1_desktop_keyboard_type,
post_v1_desktop_keyboard_press,
post_v1_desktop_keyboard_down,
post_v1_desktop_keyboard_up,
get_v1_desktop_display_info,
get_v1_desktop_windows,
post_v1_desktop_recording_start,
post_v1_desktop_recording_stop,
get_v1_desktop_recordings,
get_v1_desktop_recording,
get_v1_desktop_recording_download,
delete_v1_desktop_recording,
post_v1_desktop_stream_start,
post_v1_desktop_stream_stop,
get_v1_desktop_stream_ws,
get_v1_agents,
get_v1_agent,
post_v1_agent_install,
@ -416,17 +459,30 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
DesktopStatusResponse,
DesktopStartRequest,
DesktopScreenshotQuery,
DesktopScreenshotFormat,
DesktopRegionScreenshotQuery,
DesktopMousePositionResponse,
DesktopMouseButton,
DesktopMouseMoveRequest,
DesktopMouseClickRequest,
DesktopMouseDownRequest,
DesktopMouseUpRequest,
DesktopMouseDragRequest,
DesktopMouseScrollRequest,
DesktopKeyboardTypeRequest,
DesktopKeyboardPressRequest,
DesktopKeyModifiers,
DesktopKeyboardDownRequest,
DesktopKeyboardUpRequest,
DesktopActionResponse,
DesktopDisplayInfoResponse,
DesktopWindowInfo,
DesktopWindowListResponse,
DesktopRecordingStartRequest,
DesktopRecordingStatus,
DesktopRecordingInfo,
DesktopRecordingListResponse,
DesktopStreamStatusResponse,
ServerStatus,
ServerStatusInfo,
AgentCapabilities,
@ -448,12 +504,14 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
FsActionResponse,
FsUploadBatchResponse,
ProcessConfig,
ProcessOwner,
ProcessCreateRequest,
ProcessRunRequest,
ProcessRunResponse,
ProcessState,
ProcessInfo,
ProcessListResponse,
ProcessListQuery,
ProcessLogsStream,
ProcessLogsQuery,
ProcessLogEntry,
@ -616,40 +674,42 @@ async fn post_v1_desktop_stop(
/// Capture a full desktop screenshot.
///
/// Performs a health-gated full-frame screenshot of the managed desktop and
/// returns PNG bytes.
/// returns the requested image bytes.
#[utoipa::path(
get,
path = "/v1/desktop/screenshot",
tag = "v1",
params(DesktopScreenshotQuery),
responses(
(status = 200, description = "Desktop screenshot as PNG bytes"),
(status = 200, description = "Desktop screenshot as image bytes"),
(status = 400, description = "Invalid screenshot query", body = ProblemDetails),
(status = 409, description = "Desktop runtime is not ready", body = ProblemDetails),
(status = 502, description = "Desktop runtime health or screenshot capture failed", body = ProblemDetails)
)
)]
async fn get_v1_desktop_screenshot(
State(state): State<Arc<AppState>>,
Query(query): Query<DesktopScreenshotQuery>,
) -> Result<Response, ApiError> {
let bytes = state.desktop_runtime().screenshot().await?;
Ok(([(header::CONTENT_TYPE, "image/png")], Bytes::from(bytes)).into_response())
let screenshot = state.desktop_runtime().screenshot(query).await?;
Ok((
[(header::CONTENT_TYPE, screenshot.content_type)],
Bytes::from(screenshot.bytes),
)
.into_response())
}
/// Capture a desktop screenshot region.
///
/// Performs a health-gated screenshot crop against the managed desktop and
/// returns the requested PNG region bytes.
/// returns the requested region image bytes.
#[utoipa::path(
get,
path = "/v1/desktop/screenshot/region",
tag = "v1",
params(
("x" = i32, Query, description = "Region x coordinate"),
("y" = i32, Query, description = "Region y coordinate"),
("width" = u32, Query, description = "Region width"),
("height" = u32, Query, description = "Region height")
),
params(DesktopRegionScreenshotQuery),
responses(
(status = 200, description = "Desktop screenshot region as PNG bytes"),
(status = 200, description = "Desktop screenshot region as image bytes"),
(status = 400, description = "Invalid screenshot region", body = ProblemDetails),
(status = 409, description = "Desktop runtime is not ready", body = ProblemDetails),
(status = 502, description = "Desktop runtime health or screenshot capture failed", body = ProblemDetails)
@ -659,8 +719,12 @@ async fn get_v1_desktop_screenshot_region(
State(state): State<Arc<AppState>>,
Query(query): Query<DesktopRegionScreenshotQuery>,
) -> Result<Response, ApiError> {
let bytes = state.desktop_runtime().screenshot_region(query).await?;
Ok(([(header::CONTENT_TYPE, "image/png")], Bytes::from(bytes)).into_response())
let screenshot = state.desktop_runtime().screenshot_region(query).await?;
Ok((
[(header::CONTENT_TYPE, screenshot.content_type)],
Bytes::from(screenshot.bytes),
)
.into_response())
}
/// Get the current desktop mouse position.
@ -731,6 +795,54 @@ async fn post_v1_desktop_mouse_click(
Ok(Json(position))
}
/// Press and hold a desktop mouse button.
///
/// Performs a health-gated optional pointer move followed by `xdotool mousedown`
/// and returns the resulting mouse position.
#[utoipa::path(
post,
path = "/v1/desktop/mouse/down",
tag = "v1",
request_body = DesktopMouseDownRequest,
responses(
(status = 200, description = "Desktop mouse position after button press", body = DesktopMousePositionResponse),
(status = 400, description = "Invalid mouse down request", body = ProblemDetails),
(status = 409, description = "Desktop runtime is not ready", body = ProblemDetails),
(status = 502, description = "Desktop runtime health or input failed", body = ProblemDetails)
)
)]
async fn post_v1_desktop_mouse_down(
State(state): State<Arc<AppState>>,
Json(body): Json<DesktopMouseDownRequest>,
) -> Result<Json<DesktopMousePositionResponse>, ApiError> {
let position = state.desktop_runtime().mouse_down(body).await?;
Ok(Json(position))
}
/// Release a desktop mouse button.
///
/// Performs a health-gated optional pointer move followed by `xdotool mouseup`
/// and returns the resulting mouse position.
#[utoipa::path(
post,
path = "/v1/desktop/mouse/up",
tag = "v1",
request_body = DesktopMouseUpRequest,
responses(
(status = 200, description = "Desktop mouse position after button release", body = DesktopMousePositionResponse),
(status = 400, description = "Invalid mouse up request", body = ProblemDetails),
(status = 409, description = "Desktop runtime is not ready", body = ProblemDetails),
(status = 502, description = "Desktop runtime health or input failed", body = ProblemDetails)
)
)]
async fn post_v1_desktop_mouse_up(
State(state): State<Arc<AppState>>,
Json(body): Json<DesktopMouseUpRequest>,
) -> Result<Json<DesktopMousePositionResponse>, ApiError> {
let position = state.desktop_runtime().mouse_up(body).await?;
Ok(Json(position))
}
/// Drag the desktop mouse.
///
/// Performs a health-gated drag gesture against the managed desktop and
@ -827,6 +939,54 @@ async fn post_v1_desktop_keyboard_press(
Ok(Json(response))
}
/// Press and hold a desktop keyboard key.
///
/// Performs a health-gated `xdotool keydown` operation against the managed
/// desktop.
#[utoipa::path(
post,
path = "/v1/desktop/keyboard/down",
tag = "v1",
request_body = DesktopKeyboardDownRequest,
responses(
(status = 200, description = "Desktop keyboard action result", body = DesktopActionResponse),
(status = 400, description = "Invalid keyboard down request", body = ProblemDetails),
(status = 409, description = "Desktop runtime is not ready", body = ProblemDetails),
(status = 502, description = "Desktop runtime health or input failed", body = ProblemDetails)
)
)]
async fn post_v1_desktop_keyboard_down(
State(state): State<Arc<AppState>>,
Json(body): Json<DesktopKeyboardDownRequest>,
) -> Result<Json<DesktopActionResponse>, ApiError> {
let response = state.desktop_runtime().key_down(body).await?;
Ok(Json(response))
}
/// Release a desktop keyboard key.
///
/// Performs a health-gated `xdotool keyup` operation against the managed
/// desktop.
#[utoipa::path(
post,
path = "/v1/desktop/keyboard/up",
tag = "v1",
request_body = DesktopKeyboardUpRequest,
responses(
(status = 200, description = "Desktop keyboard action result", body = DesktopActionResponse),
(status = 400, description = "Invalid keyboard up request", body = ProblemDetails),
(status = 409, description = "Desktop runtime is not ready", body = ProblemDetails),
(status = 502, description = "Desktop runtime health or input failed", body = ProblemDetails)
)
)]
async fn post_v1_desktop_keyboard_up(
State(state): State<Arc<AppState>>,
Json(body): Json<DesktopKeyboardUpRequest>,
) -> Result<Json<DesktopActionResponse>, ApiError> {
let response = state.desktop_runtime().key_up(body).await?;
Ok(Json(response))
}
/// Get desktop display information.
///
/// Performs a health-gated display query against the managed desktop and
@ -848,6 +1008,225 @@ async fn get_v1_desktop_display_info(
Ok(Json(info))
}
/// List visible desktop windows.
///
/// Performs a health-gated visible-window enumeration against the managed
/// desktop and returns the current window metadata.
#[utoipa::path(
get,
path = "/v1/desktop/windows",
tag = "v1",
responses(
(status = 200, description = "Visible desktop windows", body = DesktopWindowListResponse),
(status = 409, description = "Desktop runtime is not ready", body = ProblemDetails),
(status = 503, description = "Desktop runtime health or window query failed", body = ProblemDetails)
)
)]
async fn get_v1_desktop_windows(
State(state): State<Arc<AppState>>,
) -> Result<Json<DesktopWindowListResponse>, ApiError> {
let windows = state.desktop_runtime().list_windows().await?;
Ok(Json(windows))
}
/// Start desktop recording.
///
/// Starts an ffmpeg x11grab recording against the managed desktop and returns
/// the created recording metadata.
#[utoipa::path(
post,
path = "/v1/desktop/recording/start",
tag = "v1",
request_body = DesktopRecordingStartRequest,
responses(
(status = 200, description = "Desktop recording started", body = DesktopRecordingInfo),
(status = 409, description = "Desktop runtime is not ready or a recording is already active", body = ProblemDetails),
(status = 502, description = "Desktop recording failed", body = ProblemDetails)
)
)]
async fn post_v1_desktop_recording_start(
State(state): State<Arc<AppState>>,
Json(body): Json<DesktopRecordingStartRequest>,
) -> Result<Json<DesktopRecordingInfo>, ApiError> {
let recording = state.desktop_runtime().start_recording(body).await?;
Ok(Json(recording))
}
/// Stop desktop recording.
///
/// Stops the active desktop recording and returns the finalized recording
/// metadata.
#[utoipa::path(
post,
path = "/v1/desktop/recording/stop",
tag = "v1",
responses(
(status = 200, description = "Desktop recording stopped", body = DesktopRecordingInfo),
(status = 409, description = "No active desktop recording", body = ProblemDetails),
(status = 502, description = "Desktop recording stop failed", body = ProblemDetails)
)
)]
async fn post_v1_desktop_recording_stop(
State(state): State<Arc<AppState>>,
) -> Result<Json<DesktopRecordingInfo>, ApiError> {
let recording = state.desktop_runtime().stop_recording().await?;
Ok(Json(recording))
}
/// List desktop recordings.
///
/// Returns the current desktop recording catalog.
#[utoipa::path(
get,
path = "/v1/desktop/recordings",
tag = "v1",
responses(
(status = 200, description = "Desktop recordings", body = DesktopRecordingListResponse),
(status = 502, description = "Desktop recordings query failed", body = ProblemDetails)
)
)]
async fn get_v1_desktop_recordings(
State(state): State<Arc<AppState>>,
) -> Result<Json<DesktopRecordingListResponse>, ApiError> {
let recordings = state.desktop_runtime().list_recordings().await?;
Ok(Json(recordings))
}
/// Get desktop recording metadata.
///
/// Returns metadata for a single desktop recording.
#[utoipa::path(
get,
path = "/v1/desktop/recordings/{id}",
tag = "v1",
params(
("id" = String, Path, description = "Desktop recording ID")
),
responses(
(status = 200, description = "Desktop recording metadata", body = DesktopRecordingInfo),
(status = 404, description = "Unknown desktop recording", body = ProblemDetails)
)
)]
async fn get_v1_desktop_recording(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<Json<DesktopRecordingInfo>, ApiError> {
let recording = state.desktop_runtime().get_recording(&id).await?;
Ok(Json(recording))
}
/// Download a desktop recording.
///
/// Serves the recorded MP4 bytes for a completed desktop recording.
#[utoipa::path(
get,
path = "/v1/desktop/recordings/{id}/download",
tag = "v1",
params(
("id" = String, Path, description = "Desktop recording ID")
),
responses(
(status = 200, description = "Desktop recording as MP4 bytes"),
(status = 404, description = "Unknown desktop recording", body = ProblemDetails)
)
)]
async fn get_v1_desktop_recording_download(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<Response, ApiError> {
let path = state.desktop_runtime().recording_download_path(&id).await?;
let bytes = tokio::fs::read(&path).await.map_err(|err| SandboxError::StreamError {
message: format!("failed to read desktop recording {}: {err}", path.display()),
})?;
Ok(([(header::CONTENT_TYPE, "video/mp4")], Bytes::from(bytes)).into_response())
}
/// Delete a desktop recording.
///
/// Removes a completed desktop recording and its file from disk.
#[utoipa::path(
delete,
path = "/v1/desktop/recordings/{id}",
tag = "v1",
params(
("id" = String, Path, description = "Desktop recording ID")
),
responses(
(status = 204, description = "Desktop recording deleted"),
(status = 404, description = "Unknown desktop recording", body = ProblemDetails),
(status = 409, description = "Desktop recording is still active", body = ProblemDetails)
)
)]
async fn delete_v1_desktop_recording(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<StatusCode, ApiError> {
state.desktop_runtime().delete_recording(&id).await?;
Ok(StatusCode::NO_CONTENT)
}
/// Start desktop streaming.
///
/// Enables desktop websocket streaming for the managed desktop.
#[utoipa::path(
post,
path = "/v1/desktop/stream/start",
tag = "v1",
responses(
(status = 200, description = "Desktop streaming started", body = DesktopStreamStatusResponse)
)
)]
async fn post_v1_desktop_stream_start(
State(state): State<Arc<AppState>>,
) -> Result<Json<DesktopStreamStatusResponse>, ApiError> {
Ok(Json(state.desktop_runtime().start_streaming().await))
}
/// Stop desktop streaming.
///
/// Disables desktop websocket streaming for the managed desktop.
#[utoipa::path(
post,
path = "/v1/desktop/stream/stop",
tag = "v1",
responses(
(status = 200, description = "Desktop streaming stopped", body = DesktopStreamStatusResponse)
)
)]
async fn post_v1_desktop_stream_stop(
State(state): State<Arc<AppState>>,
) -> Result<Json<DesktopStreamStatusResponse>, ApiError> {
Ok(Json(state.desktop_runtime().stop_streaming().await))
}
/// Open a desktop websocket streaming session.
///
/// Upgrades the connection to a websocket that streams JPEG desktop frames and
/// accepts mouse and keyboard control frames.
#[utoipa::path(
get,
path = "/v1/desktop/stream/ws",
tag = "v1",
params(
("access_token" = Option<String>, Query, description = "Bearer token alternative for WS auth")
),
responses(
(status = 101, description = "WebSocket upgraded"),
(status = 409, description = "Desktop runtime or streaming session is not ready", body = ProblemDetails),
(status = 502, description = "Desktop stream failed", body = ProblemDetails)
)
)]
async fn get_v1_desktop_stream_ws(
State(state): State<Arc<AppState>>,
Query(_query): Query<ProcessWsQuery>,
ws: WebSocketUpgrade,
) -> Result<Response, ApiError> {
state.desktop_runtime().ensure_streaming_active().await?;
Ok(ws
.on_upgrade(move |socket| desktop_stream_ws_session(socket, state.desktop_runtime()))
.into_response())
}
#[utoipa::path(
get,
path = "/v1/agents",
@ -1610,6 +1989,8 @@ async fn post_v1_processes(
env: body.env.into_iter().collect(),
tty: body.tty,
interactive: body.interactive,
owner: RuntimeProcessOwner::User,
restart_policy: None,
})
.await?;
@ -1670,6 +2051,7 @@ async fn post_v1_processes_run(
get,
path = "/v1/processes",
tag = "v1",
params(ProcessListQuery),
responses(
(status = 200, description = "List processes", body = ProcessListResponse),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
@ -1677,12 +2059,16 @@ async fn post_v1_processes_run(
)]
async fn get_v1_processes(
State(state): State<Arc<AppState>>,
Query(query): Query<ProcessListQuery>,
) -> Result<Json<ProcessListResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshots = state.process_runtime().list_processes().await;
let snapshots = state
.process_runtime()
.list_processes(query.owner.map(into_runtime_process_owner))
.await;
Ok(Json(ProcessListResponse {
processes: snapshots.into_iter().map(map_process_snapshot).collect(),
}))
@ -2063,6 +2449,46 @@ enum TerminalClientFrame {
Close,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
enum DesktopStreamClientFrame {
MoveMouse {
x: i32,
y: i32,
},
MouseDown {
#[serde(default)]
x: Option<i32>,
#[serde(default)]
y: Option<i32>,
#[serde(default)]
button: Option<DesktopMouseButton>,
},
MouseUp {
#[serde(default)]
x: Option<i32>,
#[serde(default)]
y: Option<i32>,
#[serde(default)]
button: Option<DesktopMouseButton>,
},
Scroll {
x: i32,
y: i32,
#[serde(default)]
delta_x: Option<i32>,
#[serde(default)]
delta_y: Option<i32>,
},
KeyDown {
key: String,
},
KeyUp {
key: String,
},
Close,
}
async fn process_terminal_ws_session(
mut socket: WebSocket,
runtime: Arc<ProcessRuntime>,
@ -2175,6 +2601,133 @@ async fn process_terminal_ws_session(
}
}
async fn desktop_stream_ws_session(mut socket: WebSocket, desktop_runtime: Arc<DesktopRuntime>) {
let display_info = match desktop_runtime.display_info().await {
Ok(info) => info,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
let _ = socket.close().await;
return;
}
};
if send_ws_json(
&mut socket,
json!({
"type": "ready",
"width": display_info.resolution.width,
"height": display_info.resolution.height,
}),
)
.await
.is_err()
{
return;
}
let mut frame_tick = tokio::time::interval(Duration::from_millis(100));
loop {
tokio::select! {
ws_in = socket.recv() => {
match ws_in {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<DesktopStreamClientFrame>(&text) {
Ok(DesktopStreamClientFrame::MoveMouse { x, y }) => {
if let Err(err) = desktop_runtime
.move_mouse(DesktopMouseMoveRequest { x, y })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::MouseDown { x, y, button }) => {
if let Err(err) = desktop_runtime
.mouse_down(DesktopMouseDownRequest { x, y, button })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::MouseUp { x, y, button }) => {
if let Err(err) = desktop_runtime
.mouse_up(DesktopMouseUpRequest { x, y, button })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::Scroll { x, y, delta_x, delta_y }) => {
if let Err(err) = desktop_runtime
.scroll_mouse(DesktopMouseScrollRequest {
x,
y,
delta_x,
delta_y,
})
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::KeyDown { key }) => {
if let Err(err) = desktop_runtime
.key_down(DesktopKeyboardDownRequest { key })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::KeyUp { key }) => {
if let Err(err) = desktop_runtime
.key_up(DesktopKeyboardUpRequest { key })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::Close) => {
let _ = socket.close().await;
break;
}
Err(err) => {
let _ = send_ws_error(&mut socket, &format!("invalid desktop stream frame: {err}")).await;
}
}
}
Some(Ok(Message::Ping(payload))) => {
let _ = socket.send(Message::Pong(payload)).await;
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Binary(_))) | Some(Ok(Message::Pong(_))) => {}
Some(Err(_)) => break,
}
}
_ = frame_tick.tick() => {
let frame = desktop_runtime
.screenshot(DesktopScreenshotQuery {
format: Some(DesktopScreenshotFormat::Jpeg),
quality: Some(60),
scale: Some(1.0),
})
.await;
match frame {
Ok(frame) => {
if socket.send(Message::Binary(frame.bytes.into())).await.is_err() {
break;
}
}
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
let _ = socket.close().await;
break;
}
}
}
}
}
}
async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
socket
.send(Message::Text(
@ -2543,6 +3096,14 @@ fn into_runtime_process_config(config: ProcessConfig) -> ProcessRuntimeConfig {
}
}
fn into_runtime_process_owner(owner: ProcessOwner) -> RuntimeProcessOwner {
match owner {
ProcessOwner::User => RuntimeProcessOwner::User,
ProcessOwner::Desktop => RuntimeProcessOwner::Desktop,
ProcessOwner::System => RuntimeProcessOwner::System,
}
}
fn map_process_snapshot(snapshot: ProcessSnapshot) -> ProcessInfo {
ProcessInfo {
id: snapshot.id,
@ -2551,6 +3112,11 @@ fn map_process_snapshot(snapshot: ProcessSnapshot) -> ProcessInfo {
cwd: snapshot.cwd,
tty: snapshot.tty,
interactive: snapshot.interactive,
owner: match snapshot.owner {
RuntimeProcessOwner::User => ProcessOwner::User,
RuntimeProcessOwner::Desktop => ProcessOwner::Desktop,
RuntimeProcessOwner::System => ProcessOwner::System,
},
status: match snapshot.status {
ProcessStatus::Running => ProcessState::Running,
ProcessStatus::Exited => ProcessState::Exited,

View file

@ -33,7 +33,8 @@ pub(super) async fn require_token(
.and_then(|value| value.to_str().ok())
.and_then(|value| value.strip_prefix("Bearer "));
let allow_query_token = request.uri().path().ends_with("/terminal/ws");
let allow_query_token = request.uri().path().ends_with("/terminal/ws")
|| request.uri().path().ends_with("/stream/ws");
let query_token = if allow_query_token {
request
.uri()

View file

@ -425,6 +425,14 @@ pub enum ProcessState {
Exited,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessOwner {
User,
Desktop,
System,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInfo {
@ -435,6 +443,7 @@ pub struct ProcessInfo {
pub cwd: Option<String>,
pub tty: bool,
pub interactive: bool,
pub owner: ProcessOwner,
pub status: ProcessState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
@ -451,6 +460,13 @@ pub struct ProcessListResponse {
pub processes: Vec<ProcessInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, IntoParams)]
#[serde(rename_all = "camelCase")]
pub struct ProcessListQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub owner: Option<ProcessOwner>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessLogsStream {

View file

@ -1,6 +1,28 @@
use super::*;
use futures::{SinkExt, StreamExt};
use serial_test::serial;
use std::collections::BTreeMap;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
fn png_dimensions(bytes: &[u8]) -> (u32, u32) {
assert!(bytes.starts_with(b"\x89PNG\r\n\x1a\n"));
let width = u32::from_be_bytes(bytes[16..20].try_into().expect("png width bytes"));
let height = u32::from_be_bytes(bytes[20..24].try_into().expect("png height bytes"));
(width, height)
}
async fn recv_ws_message(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
) -> Message {
tokio::time::timeout(Duration::from_secs(5), ws.next())
.await
.expect("timed out waiting for websocket frame")
.expect("websocket stream ended")
.expect("websocket frame")
}
#[tokio::test]
#[serial]
@ -89,6 +111,43 @@ async fn v1_desktop_lifecycle_and_actions_work_with_real_runtime() {
Some("image/png")
);
assert!(body.starts_with(b"\x89PNG\r\n\x1a\n"));
assert_eq!(png_dimensions(&body), (1440, 900));
let (status, headers, body) = send_request_raw(
&test_app.app,
Method::GET,
"/v1/desktop/screenshot?format=jpeg&quality=50",
None,
&[],
None,
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(
headers
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some("image/jpeg")
);
assert!(body.starts_with(&[0xff, 0xd8, 0xff]));
let (status, headers, body) = send_request_raw(
&test_app.app,
Method::GET,
"/v1/desktop/screenshot?scale=0.5",
None,
&[],
None,
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(
headers
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some("image/png")
);
assert_eq!(png_dimensions(&body), (720, 450));
let (status, _, body) = send_request_raw(
&test_app.app,
@ -165,6 +224,49 @@ async fn v1_desktop_lifecycle_and_actions_work_with_real_runtime() {
assert_eq!(clicked["x"], 220);
assert_eq!(clicked["y"], 230);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/down",
Some(json!({
"x": 220,
"y": 230,
"button": "left"
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let mouse_down = parse_json(&body);
assert_eq!(mouse_down["x"], 220);
assert_eq!(mouse_down["y"], 230);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/move",
Some(json!({ "x": 260, "y": 280 })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let moved_while_down = parse_json(&body);
assert_eq!(moved_while_down["x"], 260);
assert_eq!(moved_while_down["y"], 280);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/mouse/up",
Some(json!({ "button": "left" })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let mouse_up = parse_json(&body);
assert_eq!(mouse_up["x"], 260);
assert_eq!(mouse_up["y"], 280);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
@ -182,6 +284,11 @@ async fn v1_desktop_lifecycle_and_actions_work_with_real_runtime() {
assert_eq!(scrolled["x"], 220);
assert_eq!(scrolled["y"], 230);
let (status, _, body) =
send_request(&test_app.app, Method::GET, "/v1/desktop/windows", None, &[]).await;
assert_eq!(status, StatusCode::OK);
assert!(parse_json(&body)["windows"].is_array());
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
@ -219,6 +326,167 @@ async fn v1_desktop_lifecycle_and_actions_work_with_real_runtime() {
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["ok"], true);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/keyboard/press",
Some(json!({
"key": "l",
"modifiers": {
"ctrl": true
}
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["ok"], true);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/keyboard/down",
Some(json!({ "key": "shift" })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["ok"], true);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/keyboard/up",
Some(json!({ "key": "shift" })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["ok"], true);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/recording/start",
Some(json!({ "fps": 8 })),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let recording = parse_json(&body);
let recording_id = recording["id"].as_str().expect("recording id").to_string();
assert_eq!(recording["status"], "recording");
tokio::time::sleep(Duration::from_secs(2)).await;
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/recording/stop",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let stopped_recording = parse_json(&body);
assert_eq!(stopped_recording["id"], recording_id);
assert_eq!(stopped_recording["status"], "completed");
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/desktop/recordings",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert!(parse_json(&body)["recordings"].is_array());
let (status, headers, body) = send_request_raw(
&test_app.app,
Method::GET,
&format!("/v1/desktop/recordings/{recording_id}/download"),
None,
&[],
None,
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(
headers
.get(header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some("video/mp4")
);
assert!(body.windows(4).any(|window| window == b"ftyp"));
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/stream/start",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["active"], true);
let (mut ws, _) = connect_async(test_app.app.ws_url("/v1/desktop/stream/ws"))
.await
.expect("connect desktop stream websocket");
let ready = recv_ws_message(&mut ws).await;
match ready {
Message::Text(text) => {
let value: Value = serde_json::from_str(&text).expect("desktop stream ready frame");
assert_eq!(value["type"], "ready");
assert_eq!(value["width"], 1440);
assert_eq!(value["height"], 900);
}
other => panic!("expected text ready frame, got {other:?}"),
}
let frame = recv_ws_message(&mut ws).await;
match frame {
Message::Binary(bytes) => assert!(bytes.starts_with(&[0xff, 0xd8, 0xff])),
other => panic!("expected binary jpeg frame, got {other:?}"),
}
ws.send(Message::Text(
json!({
"type": "moveMouse",
"x": 320,
"y": 330
})
.to_string()
.into(),
))
.await
.expect("send desktop stream mouse move");
let _ = ws.close(None).await;
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/stream/stop",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["active"], false);
let (status, _, _) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/desktop/recordings/{recording_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT);
let (status, _, body) =
send_request(&test_app.app, Method::POST, "/v1/desktop/stop", None, &[]).await;
assert_eq!(status, StatusCode::OK);

View file

@ -2,6 +2,7 @@ use super::*;
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine;
use futures::{SinkExt, StreamExt};
use serial_test::serial;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
@ -277,6 +278,92 @@ async fn v1_process_tty_input_and_logs() {
assert_eq!(status, StatusCode::NO_CONTENT);
}
#[tokio::test]
#[serial]
async fn v1_processes_owner_filter_separates_user_and_desktop_processes() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "sleep 30"],
"tty": false,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let user_process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/desktop/start",
Some(json!({
"width": 1024,
"height": 768
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["state"], "active");
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes?owner=user",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let user_processes = parse_json(&body)["processes"]
.as_array()
.cloned()
.unwrap_or_default();
assert!(user_processes.iter().any(|process| process["id"] == user_process_id));
assert!(user_processes.iter().all(|process| process["owner"] == "user"));
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes?owner=desktop",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let desktop_processes = parse_json(&body)["processes"]
.as_array()
.cloned()
.unwrap_or_default();
assert!(desktop_processes.len() >= 2);
assert!(desktop_processes.iter().all(|process| process["owner"] == "desktop"));
let (status, _, _) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{user_process_id}/kill"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, _, body) =
send_request(&test_app.app, Method::POST, "/v1/desktop/stop", None, &[]).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["state"], "inactive");
}
#[tokio::test]
async fn v1_process_not_found_returns_404() {
let test_app = TestApp::new(AuthConfig::disabled());