feat: enhance desktop computer-use streaming with neko integration

Improve desktop streaming architecture, add inspector dev tooling,
React DesktopViewer updates, and computer-use documentation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-16 23:59:43 -07:00
parent 4252c705df
commit 2d8508d6e2
17 changed files with 2712 additions and 688 deletions

View file

@ -41,6 +41,7 @@ base64.workspace = true
toml_edit.workspace = true
tar.workspace = true
zip.workspace = true
tokio-tungstenite = "0.24"
tempfile = { workspace = true, optional = true }
[target.'cfg(unix)'.dependencies]

View file

@ -172,9 +172,9 @@ impl DesktopRuntime {
let recording_manager =
DesktopRecordingManager::new(process_runtime.clone(), config.state_dir.clone());
Self {
streaming_manager: DesktopStreamingManager::new(process_runtime.clone()),
process_runtime,
recording_manager,
streaming_manager: DesktopStreamingManager::new(),
inner: Arc::new(Mutex::new(DesktopRuntimeStateData {
state: DesktopState::Inactive,
display_num: config.display_num,
@ -197,7 +197,10 @@ impl DesktopRuntime {
pub async fn status(&self) -> DesktopStatusResponse {
let mut state = self.inner.lock().await;
self.refresh_status_locked(&mut state).await;
self.snapshot_locked(&state)
let mut response = self.snapshot_locked(&state);
drop(state);
self.append_neko_process(&mut response).await;
response
}
pub async fn start(
@ -221,7 +224,10 @@ impl DesktopRuntime {
self.refresh_status_locked(&mut state).await;
if state.state == DesktopState::Active {
return Ok(self.snapshot_locked(&state));
let mut response = self.snapshot_locked(&state);
drop(state);
self.append_neko_process(&mut response).await;
return Ok(response);
}
if !state.missing_dependencies.is_empty() {
@ -307,7 +313,10 @@ impl DesktopRuntime {
),
);
Ok(self.snapshot_locked(&state))
let mut response = self.snapshot_locked(&state);
drop(state);
self.append_neko_process(&mut response).await;
Ok(response)
}
pub async fn stop(&self) -> Result<DesktopStatusResponse, DesktopProblem> {
@ -336,7 +345,10 @@ impl DesktopRuntime {
state.install_command = self.install_command_for(&state.missing_dependencies);
state.environment.clear();
Ok(self.snapshot_locked(&state))
let mut response = self.snapshot_locked(&state);
drop(state);
self.append_neko_process(&mut response).await;
Ok(response)
}
pub async fn shutdown(&self) {
@ -630,8 +642,26 @@ impl DesktopRuntime {
self.recording_manager.delete(id).await
}
pub async fn start_streaming(&self) -> DesktopStreamStatusResponse {
self.streaming_manager.start().await
pub async fn start_streaming(&self) -> Result<DesktopStreamStatusResponse, SandboxError> {
let state = self.inner.lock().await;
let display = state
.display
.as_deref()
.ok_or_else(|| SandboxError::Conflict {
message: "desktop runtime is not active".to_string(),
})?;
let resolution = state
.resolution
.clone()
.ok_or_else(|| SandboxError::Conflict {
message: "desktop runtime is not active".to_string(),
})?;
let environment = state.environment.clone();
let display = display.to_string();
drop(state);
self.streaming_manager
.start(&display, resolution, &environment)
.await
}
pub async fn stop_streaming(&self) -> DesktopStreamStatusResponse {
@ -642,6 +672,10 @@ impl DesktopRuntime {
self.streaming_manager.ensure_active().await
}
pub fn streaming_manager(&self) -> &DesktopStreamingManager {
&self.streaming_manager
}
async fn recording_context(&self) -> Result<DesktopRecordingContext, SandboxError> {
let mut state = self.inner.lock().await;
let ready = self
@ -1577,6 +1611,14 @@ impl DesktopRuntime {
processes
}
/// Append neko streaming process info to the response, if a neko process
/// has been started by the streaming manager.
async fn append_neko_process(&self, response: &mut DesktopStatusResponse) {
if let Some(neko_info) = self.streaming_manager.process_info().await {
response.processes.push(neko_info);
}
}
fn record_problem_locked(&self, state: &mut DesktopRuntimeStateData, problem: &DesktopProblem) {
state.last_error = Some(problem.to_error_info());
self.write_runtime_log_locked(

View file

@ -1,37 +1,205 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use sandbox_agent_error::SandboxError;
use crate::desktop_types::DesktopStreamStatusResponse;
use crate::desktop_types::{DesktopProcessInfo, DesktopResolution, DesktopStreamStatusResponse};
use crate::process_runtime::{ProcessOwner, ProcessRuntime, ProcessStartSpec};
/// Internal port where neko listens for HTTP/WS traffic.
const NEKO_INTERNAL_PORT: u16 = 18100;
/// UDP ephemeral port range for WebRTC media.
const NEKO_EPR: &str = "59050-59070";
/// How long to wait for neko to become ready.
const NEKO_READY_TIMEOUT: Duration = Duration::from_secs(15);
/// How long between readiness polls.
const NEKO_READY_POLL: Duration = Duration::from_millis(300);
#[derive(Debug, Clone)]
pub struct DesktopStreamingManager {
inner: Arc<Mutex<DesktopStreamingState>>,
process_runtime: Arc<ProcessRuntime>,
}
#[derive(Debug, Default)]
struct DesktopStreamingState {
active: bool,
process_id: Option<String>,
/// Base URL for neko's internal HTTP server (e.g. "http://127.0.0.1:18100").
neko_base_url: Option<String>,
/// Session cookie obtained from neko login, used for WS auth.
neko_session_cookie: Option<String>,
display: Option<String>,
resolution: Option<DesktopResolution>,
}
impl DesktopStreamingManager {
pub fn new() -> Self {
pub fn new(process_runtime: Arc<ProcessRuntime>) -> Self {
Self {
inner: Arc::new(Mutex::new(DesktopStreamingState::default())),
process_runtime,
}
}
pub async fn start(&self) -> DesktopStreamStatusResponse {
/// Start the neko streaming subprocess targeting the given display.
pub async fn start(
&self,
display: &str,
resolution: DesktopResolution,
environment: &HashMap<String, String>,
) -> Result<DesktopStreamStatusResponse, SandboxError> {
let mut state = self.inner.lock().await;
if state.active {
return Ok(DesktopStreamStatusResponse { active: true });
}
// Stop any stale process.
if let Some(ref old_id) = state.process_id {
let _ = self.process_runtime.stop_process(old_id, Some(2000)).await;
state.process_id = None;
state.neko_base_url = None;
state.neko_session_cookie = None;
}
let mut env = environment.clone();
env.insert("DISPLAY".to_string(), display.to_string());
let bind_addr = format!("0.0.0.0:{}", NEKO_INTERNAL_PORT);
let screen = format!("{}x{}@30", resolution.width, resolution.height);
let snapshot = self
.process_runtime
.start_process(ProcessStartSpec {
command: "neko".to_string(),
args: vec![
"serve".to_string(),
"--server.bind".to_string(),
bind_addr,
"--desktop.screen".to_string(),
screen,
"--desktop.display".to_string(),
display.to_string(),
"--capture.video.display".to_string(),
display.to_string(),
"--capture.video.codec".to_string(),
"vp8".to_string(),
"--capture.audio.codec".to_string(),
"opus".to_string(),
"--webrtc.epr".to_string(),
NEKO_EPR.to_string(),
"--webrtc.icelite".to_string(),
"--webrtc.nat1to1".to_string(),
"127.0.0.1".to_string(),
"--member.provider".to_string(),
"noauth".to_string(),
// Disable the custom xf86-input-neko driver (defaults to true
// in neko v3). The driver socket is not available outside
// neko's official Docker images; XTEST is used instead.
"--desktop.input.enabled=false".to_string(),
],
cwd: None,
env,
tty: false,
interactive: false,
owner: ProcessOwner::Desktop,
restart_policy: None,
})
.await
.map_err(|e| SandboxError::Conflict {
message: format!("failed to start neko streaming process: {e}"),
})?;
let neko_base = format!("http://127.0.0.1:{}", NEKO_INTERNAL_PORT);
state.process_id = Some(snapshot.id.clone());
state.neko_base_url = Some(neko_base.clone());
state.display = Some(display.to_string());
state.resolution = Some(resolution);
state.active = true;
DesktopStreamStatusResponse { active: true }
// Drop the lock before waiting for readiness.
drop(state);
// Wait for neko to be ready by polling its login endpoint.
let deadline = tokio::time::Instant::now() + NEKO_READY_TIMEOUT;
let login_url = format!("{}/api/login", neko_base);
let client = reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none())
.build()
.unwrap_or_else(|_| reqwest::Client::new());
let mut session_cookie = None;
loop {
match client
.post(&login_url)
.json(&serde_json::json!({"username": "admin", "password": "admin"}))
.send()
.await
{
Ok(resp) if resp.status().is_success() => {
// Extract NEKO_SESSION cookie from Set-Cookie header.
if let Some(set_cookie) = resp.headers().get("set-cookie") {
if let Ok(cookie_str) = set_cookie.to_str() {
// Extract just the cookie value (before the first ';').
if let Some(cookie_part) = cookie_str.split(';').next() {
session_cookie = Some(cookie_part.to_string());
}
}
}
tracing::info!("neko streaming process ready, session obtained");
// Take control so the connected client can send input.
let control_url = format!("{}/api/room/control/take", neko_base);
if let Some(ref cookie) = session_cookie {
let _ = client
.post(&control_url)
.header("Cookie", cookie.as_str())
.send()
.await;
tracing::info!("neko control taken");
}
break;
}
_ => {}
}
if tokio::time::Instant::now() >= deadline {
tracing::warn!("neko did not become ready within timeout, proceeding anyway");
break;
}
tokio::time::sleep(NEKO_READY_POLL).await;
}
// Store the session cookie.
if let Some(ref cookie) = session_cookie {
let mut state = self.inner.lock().await;
state.neko_session_cookie = Some(cookie.clone());
}
Ok(DesktopStreamStatusResponse { active: true })
}
/// Stop streaming and tear down neko subprocess.
pub async fn stop(&self) -> DesktopStreamStatusResponse {
let mut state = self.inner.lock().await;
if let Some(ref process_id) = state.process_id.take() {
let _ = self
.process_runtime
.stop_process(process_id, Some(3000))
.await;
}
state.active = false;
state.neko_base_url = None;
state.neko_session_cookie = None;
state.display = None;
state.resolution = None;
DesktopStreamStatusResponse { active: false }
}
@ -44,4 +212,109 @@ impl DesktopStreamingManager {
})
}
}
/// Get the neko WebSocket URL for signaling proxy, including session cookie.
pub async fn neko_ws_url(&self) -> Option<String> {
self.inner
.lock()
.await
.neko_base_url
.as_ref()
.map(|base| base.replace("http://", "ws://") + "/api/ws")
}
/// Get the neko base HTTP URL (e.g. `http://127.0.0.1:18100`).
pub async fn neko_base_url(&self) -> Option<String> {
self.inner.lock().await.neko_base_url.clone()
}
/// Create a fresh neko login session and return the session cookie.
/// Each WebSocket proxy connection should call this to get its own
/// session, avoiding conflicts when multiple clients connect.
/// Uses a unique username per connection so neko treats them as
/// separate members (noauth provider allows any credentials).
pub async fn create_neko_session(&self) -> Option<String> {
let base_url = self.neko_base_url().await?;
let client = reqwest::Client::new();
let login_url = format!("{}/api/login", base_url);
let username = format!(
"user-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
);
tracing::debug!(
"creating neko session: username={}, url={}",
username,
login_url
);
let resp = match client
.post(&login_url)
.json(&serde_json::json!({"username": username, "password": "admin"}))
.send()
.await
{
Ok(r) => r,
Err(e) => {
tracing::warn!("neko login request failed: {e}");
return None;
}
};
if !resp.status().is_success() {
tracing::warn!("neko login returned status {}", resp.status());
return None;
}
let cookie = resp
.headers()
.get("set-cookie")
.and_then(|v| v.to_str().ok())
.map(|v| v.split(';').next().unwrap_or(v).to_string());
let cookie = match cookie {
Some(c) => c,
None => {
tracing::warn!("neko login response missing set-cookie header");
return None;
}
};
tracing::debug!("neko session created: {}", username);
// Take control for this session.
let control_url = format!("{}/api/room/control/take", base_url);
let _ = client
.post(&control_url)
.header("Cookie", &cookie)
.send()
.await;
Some(cookie)
}
/// Get the shared neko session cookie (used during startup).
pub async fn neko_session_cookie(&self) -> Option<String> {
self.inner.lock().await.neko_session_cookie.clone()
}
pub async fn resolution(&self) -> Option<DesktopResolution> {
self.inner.lock().await.resolution.clone()
}
pub async fn is_active(&self) -> bool {
self.inner.lock().await.active
}
/// Return process diagnostics for the neko streaming subprocess, if one
/// has been started. The returned info mirrors the shape used by
/// `DesktopRuntime::processes_locked` for xvfb/openbox/dbus.
pub async fn process_info(&self) -> Option<DesktopProcessInfo> {
let state = self.inner.lock().await;
let process_id = state.process_id.as_ref()?;
let snapshot = self.process_runtime.snapshot(process_id).await.ok()?;
Some(DesktopProcessInfo {
name: "neko".to_string(),
pid: snapshot.pid,
running: snapshot.status == crate::process_runtime::ProcessStatus::Running,
log_path: None,
})
}
}

View file

@ -235,7 +235,7 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
)
.route("/desktop/stream/start", post(post_v1_desktop_stream_start))
.route("/desktop/stream/stop", post(post_v1_desktop_stream_stop))
.route("/desktop/stream/ws", get(get_v1_desktop_stream_ws))
.route("/desktop/stream/signaling", get(get_v1_desktop_stream_ws))
.route("/agents", get(get_v1_agents))
.route("/agents/:agent", get(get_v1_agent))
.route("/agents/:agent/install", post(post_v1_agent_install))
@ -1181,7 +1181,7 @@ async fn delete_v1_desktop_recording(
async fn post_v1_desktop_stream_start(
State(state): State<Arc<AppState>>,
) -> Result<Json<DesktopStreamStatusResponse>, ApiError> {
Ok(Json(state.desktop_runtime().start_streaming().await))
Ok(Json(state.desktop_runtime().start_streaming().await?))
}
/// Stop desktop streaming.
@ -1201,13 +1201,14 @@ async fn post_v1_desktop_stream_stop(
Ok(Json(state.desktop_runtime().stop_streaming().await))
}
/// Open a desktop websocket streaming session.
/// Open a desktop WebRTC signaling session.
///
/// Upgrades the connection to a websocket that streams JPEG desktop frames and
/// accepts mouse and keyboard control frames.
/// Upgrades the connection to a WebSocket used for WebRTC signaling between
/// the browser client and the desktop streaming process. Also accepts mouse
/// and keyboard input frames as a fallback transport.
#[utoipa::path(
get,
path = "/v1/desktop/stream/ws",
path = "/v1/desktop/stream/signaling",
tag = "v1",
params(
("access_token" = Option<String>, Query, description = "Bearer token alternative for WS auth")
@ -2451,46 +2452,6 @@ enum TerminalClientFrame {
Close,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
enum DesktopStreamClientFrame {
MoveMouse {
x: i32,
y: i32,
},
MouseDown {
#[serde(default)]
x: Option<i32>,
#[serde(default)]
y: Option<i32>,
#[serde(default)]
button: Option<DesktopMouseButton>,
},
MouseUp {
#[serde(default)]
x: Option<i32>,
#[serde(default)]
y: Option<i32>,
#[serde(default)]
button: Option<DesktopMouseButton>,
},
Scroll {
x: i32,
y: i32,
#[serde(default)]
delta_x: Option<i32>,
#[serde(default)]
delta_y: Option<i32>,
},
KeyDown {
key: String,
},
KeyUp {
key: String,
},
Close,
}
async fn process_terminal_ws_session(
mut socket: WebSocket,
runtime: Arc<ProcessRuntime>,
@ -2603,131 +2564,115 @@ async fn process_terminal_ws_session(
}
}
async fn desktop_stream_ws_session(mut socket: WebSocket, desktop_runtime: Arc<DesktopRuntime>) {
let display_info = match desktop_runtime.display_info().await {
Ok(info) => info,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
let _ = socket.close().await;
/// WebRTC signaling proxy session.
///
/// Proxies the WebSocket bidirectionally between the browser client and neko's
/// internal WebSocket endpoint. All neko signaling messages (SDP offers/answers,
/// ICE candidates, system events) are relayed transparently.
async fn desktop_stream_ws_session(mut client_ws: WebSocket, desktop_runtime: Arc<DesktopRuntime>) {
use futures::SinkExt;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
// Get neko's internal WS URL from the streaming manager.
let neko_ws_url = match desktop_runtime.streaming_manager().neko_ws_url().await {
Some(url) => url,
None => {
let _ = send_ws_error(&mut client_ws, "streaming process is not available").await;
let _ = client_ws.close().await;
return;
}
};
if send_ws_json(
&mut socket,
json!({
"type": "ready",
"width": display_info.resolution.width,
"height": display_info.resolution.height,
}),
)
.await
.is_err()
{
return;
}
// Create a fresh neko login session for this connection.
// Each proxy connection gets its own neko session to avoid conflicts
// when multiple clients connect (neko sends signal/close to shared sessions).
let session_cookie = desktop_runtime
.streaming_manager()
.create_neko_session()
.await;
let mut frame_tick = tokio::time::interval(Duration::from_millis(100));
// Build a WS request with the neko session cookie for authentication.
let ws_req = {
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
let mut req = neko_ws_url
.into_client_request()
.expect("valid neko WS URL");
if let Some(ref cookie) = session_cookie {
req.headers_mut()
.insert("Cookie", cookie.parse().expect("valid cookie header"));
}
req
};
// Connect to neko's internal WebSocket.
let (neko_ws, _) = match tokio_tungstenite::connect_async(ws_req).await {
Ok(conn) => conn,
Err(err) => {
let _ = send_ws_error(
&mut client_ws,
&format!("failed to connect to streaming process: {err}"),
)
.await;
let _ = client_ws.close().await;
return;
}
};
let (mut neko_sink, mut neko_stream) = neko_ws.split();
// Relay messages bidirectionally between client and neko.
loop {
tokio::select! {
ws_in = socket.recv() => {
match ws_in {
// Client → Neko (signaling passthrough; input goes via WebRTC data channel)
client_msg = client_ws.recv() => {
match client_msg {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<DesktopStreamClientFrame>(&text) {
Ok(DesktopStreamClientFrame::MoveMouse { x, y }) => {
if let Err(err) = desktop_runtime
.move_mouse(DesktopMouseMoveRequest { x, y })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::MouseDown { x, y, button }) => {
if let Err(err) = desktop_runtime
.mouse_down(DesktopMouseDownRequest { x, y, button })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::MouseUp { x, y, button }) => {
if let Err(err) = desktop_runtime
.mouse_up(DesktopMouseUpRequest { x, y, button })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::Scroll { x, y, delta_x, delta_y }) => {
if let Err(err) = desktop_runtime
.scroll_mouse(DesktopMouseScrollRequest {
x,
y,
delta_x,
delta_y,
})
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::KeyDown { key }) => {
if let Err(err) = desktop_runtime
.key_down(DesktopKeyboardDownRequest { key })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::KeyUp { key }) => {
if let Err(err) = desktop_runtime
.key_up(DesktopKeyboardUpRequest { key })
.await
{
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
}
}
Ok(DesktopStreamClientFrame::Close) => {
let _ = socket.close().await;
break;
}
Err(err) => {
let _ = send_ws_error(&mut socket, &format!("invalid desktop stream frame: {err}")).await;
}
}
}
Some(Ok(Message::Ping(payload))) => {
let _ = socket.send(Message::Pong(payload)).await;
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Binary(_))) | Some(Ok(Message::Pong(_))) => {}
Some(Err(_)) => break,
}
}
_ = frame_tick.tick() => {
let frame = desktop_runtime
.screenshot(DesktopScreenshotQuery {
format: Some(DesktopScreenshotFormat::Jpeg),
quality: Some(60),
scale: Some(1.0),
})
.await;
match frame {
Ok(frame) => {
if socket.send(Message::Binary(frame.bytes.into())).await.is_err() {
if neko_sink.send(TungsteniteMessage::Text(text.into())).await.is_err() {
break;
}
}
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_error_info().message).await;
let _ = socket.close().await;
break;
Some(Ok(Message::Binary(data))) => {
if neko_sink.send(TungsteniteMessage::Binary(data.into())).await.is_err() {
break;
}
}
Some(Ok(Message::Ping(payload))) => {
let _ = client_ws.send(Message::Pong(payload)).await;
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Pong(_))) => {}
Some(Err(_)) => break,
}
}
// Neko → Client
neko_msg = neko_stream.next() => {
match neko_msg {
Some(Ok(TungsteniteMessage::Text(text))) => {
if client_ws.send(Message::Text(text.into())).await.is_err() {
break;
}
}
Some(Ok(TungsteniteMessage::Binary(data))) => {
if client_ws.send(Message::Binary(data.into())).await.is_err() {
break;
}
}
Some(Ok(TungsteniteMessage::Ping(payload))) => {
if neko_sink.send(TungsteniteMessage::Pong(payload.clone())).await.is_err() {
break;
}
}
Some(Ok(TungsteniteMessage::Close(_))) | None => break,
Some(Ok(TungsteniteMessage::Pong(_))) => {}
Some(Ok(TungsteniteMessage::Frame(_))) => {}
Some(Err(_)) => break,
}
}
}
}
let _ = neko_sink.close().await;
let _ = client_ws.close().await;
}
async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {