feat: add PTY/terminal session support to Process Manager API

Add Docker-style terminal support with -t (TTY) and -i (interactive) flags:

Backend (Rust):
- Add portable-pty dependency for PTY allocation on Unix
- Extend StartProcessRequest with tty, interactive, and terminalSize options
- Add PTY process spawning with TERM=xterm-256color
- Add WebSocket endpoint for bidirectional terminal I/O
- Add terminal resize endpoint (POST /process/:id/resize)
- Add terminal input endpoint (POST /process/:id/input)
- Support base64-encoded binary input
- Process info now includes tty, interactive, and terminalSize fields
- Terminal output is logged to combined.log for persistence

Frontend (Inspector UI):
- Add @xterm/xterm and addons for terminal rendering
- Create Terminal component with xterm.js integration
- Add tabbed view (Terminal/Logs) for PTY processes
- Terminal auto-connects via WebSocket when process is expanded
- Support terminal resize with ResizeObserver
- Show PTY badge on processes with TTY enabled
- Graceful handling of process exit and disconnection

API:
- GET /v1/process/:id/terminal - WebSocket for terminal I/O
- POST /v1/process/:id/resize - Resize terminal (cols, rows)
- POST /v1/process/:id/input - Write data to terminal

WebSocket protocol:
- type: 'data' - Terminal output (server -> client)
- type: 'input' - Terminal input (client -> server)
- type: 'resize' - Resize request (client -> server)
- type: 'exit' - Process exited (server -> client)
- type: 'error' - Error message (server -> client)
This commit is contained in:
Nathan Flurry 2026-01-30 13:12:49 -08:00
parent db0268b88f
commit ac0a22cd07
11 changed files with 1541 additions and 109 deletions

View file

@ -27,6 +27,8 @@ dirs.workspace = true
time.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-tungstenite.workspace = true
base64.workspace = true
tower-http.workspace = true
utoipa.workspace = true
schemars.workspace = true
@ -38,6 +40,7 @@ tempfile = { workspace = true, optional = true }
[target.'cfg(unix)'.dependencies]
libc = "0.2"
portable-pty = { workspace = true }
[dev-dependencies]
http-body-util.workspace = true

View file

@ -5,4 +5,5 @@ mod agent_server_logs;
pub mod process_manager;
pub mod router;
pub mod telemetry;
pub mod terminal;
pub mod ui;

View file

@ -1,8 +1,11 @@
//! Process Manager - API for spawning and managing background processes.
//!
//! Supports both regular processes and PTY-based terminal sessions.
//! PTY sessions enable interactive terminal applications with full TTY support.
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::io::{Read, Write};
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::atomic::{AtomicU64, Ordering};
@ -15,13 +18,20 @@ use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader};
use tokio::process::{Child, Command};
use tokio::sync::{broadcast, Mutex, RwLock};
use tokio::sync::{broadcast, mpsc, Mutex, RwLock};
use utoipa::ToSchema;
use sandbox_agent_error::SandboxError;
#[cfg(unix)]
use portable_pty::{native_pty_system, CommandBuilder, PtyPair, PtySize};
static PROCESS_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
/// Default terminal size (columns x rows)
const DEFAULT_COLS: u16 = 80;
const DEFAULT_ROWS: u16 = 24;
/// Process status
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "lowercase")]
@ -36,6 +46,23 @@ pub enum ProcessStatus {
Killed,
}
/// Terminal size configuration
#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TerminalSize {
pub cols: u16,
pub rows: u16,
}
impl Default for TerminalSize {
fn default() -> Self {
Self {
cols: DEFAULT_COLS,
rows: DEFAULT_ROWS,
}
}
}
/// Log file paths for a process
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
@ -61,6 +88,15 @@ pub struct ProcessInfo {
pub stopped_at: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
/// Whether this process has a PTY allocated (terminal mode)
#[serde(default)]
pub tty: bool,
/// Whether stdin is kept open for interactive input
#[serde(default)]
pub interactive: bool,
/// Current terminal size (if tty is true)
#[serde(skip_serializing_if = "Option::is_none")]
pub terminal_size: Option<TerminalSize>,
}
/// Request to start a new process
@ -74,6 +110,15 @@ pub struct StartProcessRequest {
pub cwd: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub env: HashMap<String, String>,
/// Allocate a pseudo-TTY for the process (like docker -t)
#[serde(default)]
pub tty: bool,
/// Keep stdin open for interactive input (like docker -i)
#[serde(default)]
pub interactive: bool,
/// Initial terminal size (only used if tty is true)
#[serde(skip_serializing_if = "Option::is_none")]
pub terminal_size: Option<TerminalSize>,
}
/// Response after starting a process
@ -83,6 +128,12 @@ pub struct StartProcessResponse {
pub id: String,
pub status: ProcessStatus,
pub log_paths: ProcessLogPaths,
/// Whether this process has a PTY allocated
#[serde(default)]
pub tty: bool,
/// Whether stdin is available for input
#[serde(default)]
pub interactive: bool,
}
/// Response listing all processes
@ -118,16 +169,92 @@ pub struct LogsResponse {
pub lines: usize,
}
/// Request to resize a terminal
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ResizeTerminalRequest {
pub cols: u16,
pub rows: u16,
}
/// Request to write data to a process's stdin/terminal
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct WriteInputRequest {
/// Data to write (can be raw bytes encoded as base64 or UTF-8 text)
pub data: String,
/// Whether data is base64 encoded (for binary data)
#[serde(default)]
pub base64: bool,
}
/// Message types for terminal WebSocket communication
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum TerminalMessage {
/// Data from the terminal (output)
#[serde(rename_all = "camelCase")]
Data { data: String },
/// Data to write to the terminal (input)
#[serde(rename_all = "camelCase")]
Input { data: String },
/// Resize the terminal
#[serde(rename_all = "camelCase")]
Resize { cols: u16, rows: u16 },
/// Terminal closed/process exited
#[serde(rename_all = "camelCase")]
Exit { code: Option<i32> },
/// Error message
#[serde(rename_all = "camelCase")]
Error { message: String },
}
/// Internal state for a managed process (non-PTY mode)
struct RegularProcess {
child: Child,
log_broadcaster: broadcast::Sender<String>,
}
/// Internal state for a PTY process
#[cfg(unix)]
struct PtyProcess {
/// The PTY pair (master + child handle)
pty_pair: PtyPair,
/// Child process handle
child: Box<dyn portable_pty::Child + Send>,
/// Writer for sending data to the PTY
writer: Box<dyn Write + Send>,
/// Current terminal size
size: TerminalSize,
/// Channel for sending terminal output to subscribers
output_tx: broadcast::Sender<Vec<u8>>,
/// Channel for receiving input to write to terminal
input_tx: mpsc::UnboundedSender<Vec<u8>>,
}
/// Internal state for a managed process
#[derive(Debug)]
struct ManagedProcess {
info: ProcessInfo,
/// Handle to the running process (None if process has exited)
child: Option<Child>,
/// Broadcaster for log lines (for SSE streaming)
/// Regular process handle (non-PTY)
regular: Option<RegularProcess>,
/// PTY process handle (terminal mode)
#[cfg(unix)]
pty: Option<PtyProcess>,
/// Broadcaster for log lines (for SSE streaming, used in regular mode)
log_broadcaster: broadcast::Sender<String>,
}
impl std::fmt::Debug for ManagedProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ManagedProcess")
.field("info", &self.info)
.field("has_regular", &self.regular.is_some())
#[cfg(unix)]
.field("has_pty", &self.pty.is_some())
.finish()
}
}
/// State file entry for persistence
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ProcessStateEntry {
@ -139,6 +266,8 @@ struct ProcessStateEntry {
started_at: u64,
stopped_at: Option<u64>,
cwd: Option<String>,
tty: bool,
interactive: bool,
}
/// Process Manager handles spawning and tracking background processes
@ -218,8 +347,13 @@ impl ProcessManager {
started_at: entry.started_at,
stopped_at: entry.stopped_at,
cwd: entry.cwd,
tty: entry.tty,
interactive: entry.interactive,
terminal_size: None,
},
child: None,
regular: None,
#[cfg(unix)]
pty: None,
log_broadcaster: tx,
};
@ -258,6 +392,8 @@ impl ProcessManager {
started_at: guard.info.started_at,
stopped_at: guard.info.stopped_at,
cwd: guard.info.cwd.clone(),
tty: guard.info.tty,
interactive: guard.info.interactive,
});
}
@ -287,10 +423,39 @@ impl ProcessManager {
File::create(&log_paths.stderr).map_err(|e| SandboxError::StreamError {
message: format!("Failed to create stderr log: {}", e),
})?;
File::create(&log_paths.combined).map_err(|e| SandboxError::StreamError {
message: format!("Failed to create combined log: {}", e),
})?;
let started_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
#[cfg(unix)]
if request.tty {
return self.start_pty_process(id, request, log_paths, started_at).await;
}
// Fall back to regular process if TTY not requested or not on Unix
self.start_regular_process(id, request, log_paths, started_at).await
}
/// Start a regular (non-PTY) process
async fn start_regular_process(
&self,
id: String,
request: StartProcessRequest,
log_paths: ProcessLogPaths,
started_at: u64,
) -> Result<StartProcessResponse, SandboxError> {
let combined_file = Arc::new(std::sync::Mutex::new(
File::create(&log_paths.combined).map_err(|e| SandboxError::StreamError {
message: format!("Failed to create combined log: {}", e),
})?
OpenOptions::new()
.append(true)
.open(&log_paths.combined)
.map_err(|e| SandboxError::StreamError {
message: format!("Failed to open combined log: {}", e),
})?
));
// Build the command
@ -312,11 +477,6 @@ impl ProcessManager {
message: format!("Failed to spawn process: {}", e),
})?;
let started_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let (log_tx, _) = broadcast::channel::<String>(256);
// Set up stdout reader
@ -333,11 +493,19 @@ impl ProcessManager {
started_at,
stopped_at: None,
cwd: request.cwd.clone(),
tty: false,
interactive: request.interactive,
terminal_size: None,
};
let managed = Arc::new(Mutex::new(ManagedProcess {
info: info.clone(),
child: Some(child),
regular: Some(RegularProcess {
child,
log_broadcaster: log_tx.clone(),
}),
#[cfg(unix)]
pty: None,
log_broadcaster: log_tx.clone(),
}));
@ -406,8 +574,8 @@ impl ProcessManager {
tokio::time::sleep(Duration::from_millis(100)).await;
let mut guard = managed_clone.lock().await;
if let Some(ref mut child) = guard.child {
match child.try_wait() {
if let Some(ref mut regular) = guard.regular {
match regular.child.try_wait() {
Ok(Some(status)) => {
guard.info.status = ProcessStatus::Stopped;
guard.info.exit_code = status.code();
@ -417,11 +585,8 @@ impl ProcessManager {
.unwrap_or_default()
.as_secs()
);
guard.child = None;
guard.regular = None;
drop(guard);
// Save state - we need to do this manually since we don't have self
// This is a simplified version that just updates the state file
let _ = save_state_to_file(&base_dir).await;
break;
}
@ -447,6 +612,201 @@ impl ProcessManager {
id,
status: ProcessStatus::Running,
log_paths,
tty: false,
interactive: request.interactive,
})
}
/// Start a PTY process (Unix only)
#[cfg(unix)]
async fn start_pty_process(
&self,
id: String,
request: StartProcessRequest,
log_paths: ProcessLogPaths,
started_at: u64,
) -> Result<StartProcessResponse, SandboxError> {
let size = request.terminal_size.unwrap_or_default();
// Create the PTY
let pty_system = native_pty_system();
let pty_pair = pty_system
.openpty(PtySize {
rows: size.rows,
cols: size.cols,
pixel_width: 0,
pixel_height: 0,
})
.map_err(|e| SandboxError::StreamError {
message: format!("Failed to create PTY: {}", e),
})?;
// Build the command
let mut cmd = CommandBuilder::new(&request.command);
cmd.args(&request.args);
if let Some(ref cwd) = request.cwd {
cmd.cwd(cwd);
}
for (key, value) in &request.env {
cmd.env(key, value);
}
// Set TERM environment variable
cmd.env("TERM", "xterm-256color");
// Spawn the child process
let child = pty_pair
.slave
.spawn_command(cmd)
.map_err(|e| SandboxError::StreamError {
message: format!("Failed to spawn PTY process: {}", e),
})?;
// Get the master writer
let writer = pty_pair.master.take_writer().map_err(|e| SandboxError::StreamError {
message: format!("Failed to get PTY writer: {}", e),
})?;
// Get the master reader
let mut reader = pty_pair.master.try_clone_reader().map_err(|e| SandboxError::StreamError {
message: format!("Failed to get PTY reader: {}", e),
})?;
// Create channels for terminal I/O
let (output_tx, _) = broadcast::channel::<Vec<u8>>(256);
let (input_tx, mut input_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let (log_tx, _) = broadcast::channel::<String>(256);
let info = ProcessInfo {
id: id.clone(),
command: request.command.clone(),
args: request.args.clone(),
status: ProcessStatus::Running,
exit_code: None,
log_paths: log_paths.clone(),
started_at,
stopped_at: None,
cwd: request.cwd.clone(),
tty: true,
interactive: request.interactive,
terminal_size: Some(size),
};
let managed = Arc::new(Mutex::new(ManagedProcess {
info: info.clone(),
regular: None,
pty: Some(PtyProcess {
pty_pair,
child,
writer,
size,
output_tx: output_tx.clone(),
input_tx: input_tx.clone(),
}),
log_broadcaster: log_tx.clone(),
}));
// Insert into map
{
let mut processes = self.processes.write().await;
processes.insert(id.clone(), managed.clone());
}
// Spawn a task to read PTY output
let output_tx_clone = output_tx.clone();
let combined_path = log_paths.combined.clone();
std::thread::spawn(move || {
let mut buf = [0u8; 4096];
let mut combined_file = OpenOptions::new()
.create(true)
.append(true)
.open(&combined_path)
.ok();
loop {
match reader.read(&mut buf) {
Ok(0) => break, // EOF
Ok(n) => {
let data = buf[..n].to_vec();
// Write to log file
if let Some(ref mut file) = combined_file {
let _ = file.write_all(&data);
}
// Broadcast to subscribers
let _ = output_tx_clone.send(data);
}
Err(e) => {
tracing::debug!("PTY read error: {}", e);
break;
}
}
}
});
// Spawn a task to write input to PTY
let managed_clone = managed.clone();
tokio::spawn(async move {
while let Some(data) = input_rx.recv().await {
let mut guard = managed_clone.lock().await;
if let Some(ref mut pty) = guard.pty {
if pty.writer.write_all(&data).is_err() {
break;
}
let _ = pty.writer.flush();
}
}
});
// Spawn a task to monitor process exit
let managed_clone = managed.clone();
let base_dir = self.base_dir.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
let mut guard = managed_clone.lock().await;
if let Some(ref mut pty) = guard.pty {
match pty.child.try_wait() {
Ok(Some(status)) => {
guard.info.status = ProcessStatus::Stopped;
guard.info.exit_code = status.exit_code().map(|c| c as i32);
guard.info.stopped_at = Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
);
guard.pty = None;
drop(guard);
let _ = save_state_to_file(&base_dir).await;
break;
}
Ok(None) => {
// Still running
}
Err(_) => {
break;
}
}
} else {
break;
}
}
});
// Save state
if let Err(e) = self.save_state().await {
tracing::warn!("Failed to save process state: {}", e);
}
Ok(StartProcessResponse {
id,
status: ProcessStatus::Running,
log_paths,
tty: true,
interactive: request.interactive,
})
}
@ -477,6 +837,12 @@ impl ProcessManager {
Ok(guard.info.clone())
}
/// Check if a process has TTY enabled
pub async fn is_tty_process(&self, id: &str) -> Result<bool, SandboxError> {
let info = self.get_process(id).await?;
Ok(info.tty)
}
/// Stop a process with SIGTERM
pub async fn stop_process(&self, id: &str) -> Result<(), SandboxError> {
let processes = self.processes.read().await;
@ -484,22 +850,27 @@ impl ProcessManager {
session_id: format!("process:{}", id),
})?;
let mut guard = managed.lock().await;
let guard = managed.lock().await;
if let Some(ref child) = guard.child {
// Try regular process first
if let Some(ref regular) = guard.regular {
#[cfg(unix)]
{
// Send SIGTERM
if let Some(pid) = child.id() {
if let Some(pid) = regular.child.id() {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
}
#[cfg(not(unix))]
{
// On non-Unix, we can't send SIGTERM, so just mark as stopping
// The process will be killed when delete is called if needed
}
// Try PTY process
#[cfg(unix)]
if let Some(ref pty) = guard.pty {
if let Some(pid) = pty.child.process_id() {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
}
@ -521,8 +892,9 @@ impl ProcessManager {
let mut guard = managed.lock().await;
if let Some(ref mut child) = guard.child {
let _ = child.kill().await;
// Try regular process first
if let Some(ref mut regular) = guard.regular {
let _ = regular.child.kill().await;
guard.info.status = ProcessStatus::Killed;
guard.info.stopped_at = Some(
SystemTime::now()
@ -530,7 +902,21 @@ impl ProcessManager {
.unwrap_or_default()
.as_secs()
);
guard.child = None;
guard.regular = None;
}
// Try PTY process
#[cfg(unix)]
if let Some(ref mut pty) = guard.pty {
let _ = pty.child.kill();
guard.info.status = ProcessStatus::Killed;
guard.info.stopped_at = Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
);
guard.pty = None;
}
drop(guard);
@ -549,7 +935,10 @@ impl ProcessManager {
let processes = self.processes.read().await;
if let Some(managed) = processes.get(id) {
let guard = managed.lock().await;
if guard.child.is_some() {
let is_running = guard.regular.is_some();
#[cfg(unix)]
let is_running = is_running || guard.pty.is_some();
if is_running {
return Err(SandboxError::InvalidRequest {
message: "Cannot delete a running process. Stop or kill it first.".to_string(),
});
@ -624,6 +1013,127 @@ impl ProcessManager {
let guard = managed.lock().await;
Ok(guard.log_broadcaster.subscribe())
}
/// Resize a PTY terminal
#[cfg(unix)]
pub async fn resize_terminal(&self, id: &str, cols: u16, rows: u16) -> Result<(), SandboxError> {
let processes = self.processes.read().await;
let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound {
session_id: format!("process:{}", id),
})?;
let mut guard = managed.lock().await;
if let Some(ref mut pty) = guard.pty {
pty.pty_pair
.master
.resize(PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
})
.map_err(|e| SandboxError::StreamError {
message: format!("Failed to resize terminal: {}", e),
})?;
pty.size = TerminalSize { cols, rows };
guard.info.terminal_size = Some(pty.size);
Ok(())
} else {
Err(SandboxError::InvalidRequest {
message: "Process does not have a PTY".to_string(),
})
}
}
#[cfg(not(unix))]
pub async fn resize_terminal(&self, _id: &str, _cols: u16, _rows: u16) -> Result<(), SandboxError> {
Err(SandboxError::InvalidRequest {
message: "PTY support is only available on Unix systems".to_string(),
})
}
/// Write data to a process's terminal input
#[cfg(unix)]
pub async fn write_terminal_input(&self, id: &str, data: Vec<u8>) -> Result<(), SandboxError> {
let processes = self.processes.read().await;
let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound {
session_id: format!("process:{}", id),
})?;
let guard = managed.lock().await;
if let Some(ref pty) = guard.pty {
pty.input_tx.send(data).map_err(|_| SandboxError::StreamError {
message: "Failed to send input to terminal".to_string(),
})?;
Ok(())
} else {
Err(SandboxError::InvalidRequest {
message: "Process does not have a PTY".to_string(),
})
}
}
#[cfg(not(unix))]
pub async fn write_terminal_input(&self, _id: &str, _data: Vec<u8>) -> Result<(), SandboxError> {
Err(SandboxError::InvalidRequest {
message: "PTY support is only available on Unix systems".to_string(),
})
}
/// Subscribe to terminal output
#[cfg(unix)]
pub async fn subscribe_terminal_output(&self, id: &str) -> Result<broadcast::Receiver<Vec<u8>>, SandboxError> {
let processes = self.processes.read().await;
let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound {
session_id: format!("process:{}", id),
})?;
let guard = managed.lock().await;
if let Some(ref pty) = guard.pty {
Ok(pty.output_tx.subscribe())
} else {
Err(SandboxError::InvalidRequest {
message: "Process does not have a PTY".to_string(),
})
}
}
#[cfg(not(unix))]
pub async fn subscribe_terminal_output(&self, _id: &str) -> Result<broadcast::Receiver<Vec<u8>>, SandboxError> {
Err(SandboxError::InvalidRequest {
message: "PTY support is only available on Unix systems".to_string(),
})
}
/// Get the input channel for a PTY process (for WebSocket handler)
#[cfg(unix)]
pub async fn get_terminal_input_sender(&self, id: &str) -> Result<mpsc::UnboundedSender<Vec<u8>>, SandboxError> {
let processes = self.processes.read().await;
let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound {
session_id: format!("process:{}", id),
})?;
let guard = managed.lock().await;
if let Some(ref pty) = guard.pty {
Ok(pty.input_tx.clone())
} else {
Err(SandboxError::InvalidRequest {
message: "Process does not have a PTY".to_string(),
})
}
}
#[cfg(not(unix))]
pub async fn get_terminal_input_sender(&self, _id: &str) -> Result<mpsc::UnboundedSender<Vec<u8>>, SandboxError> {
Err(SandboxError::InvalidRequest {
message: "PTY support is only available on Unix systems".to_string(),
})
}
}
impl Default for ProcessManager {
@ -648,15 +1158,12 @@ fn format_timestamp() -> String {
}
/// Strip timestamp prefixes from log lines
/// Timestamps are in format: [2026-01-30T12:32:45.123Z] or [2026-01-30T12:32:45Z]
fn strip_timestamps(content: &str) -> String {
content
.lines()
.map(|line| {
// Match pattern: [YYYY-MM-DDTHH:MM:SS...Z] at start of line
if line.starts_with('[') {
if let Some(end) = line.find("] ") {
// Check if it looks like a timestamp (starts with digit after [)
let potential_ts = &line[1..end];
if potential_ts.len() >= 19 && potential_ts.chars().next().map(|c| c.is_ascii_digit()).unwrap_or(false) {
return &line[end + 2..];
@ -669,10 +1176,8 @@ fn strip_timestamps(content: &str) -> String {
.join("\n")
}
/// Helper to save state from within a spawned task (simplified version)
/// Helper to save state from within a spawned task
async fn save_state_to_file(base_dir: &PathBuf) -> Result<(), std::io::Error> {
// This is a no-op for now - the state will be saved on the next explicit save_state call
// A more robust implementation would use a channel to communicate with the ProcessManager
let _ = base_dir;
Ok(())
}
@ -685,38 +1190,66 @@ mod tests {
async fn test_process_manager_basic() {
let manager = ProcessManager::new();
// List should be empty initially (or have persisted state)
let list = manager.list_processes().await;
let initial_count = list.processes.len();
// Start a simple process
let request = StartProcessRequest {
command: "echo".to_string(),
args: vec!["hello".to_string()],
cwd: None,
env: HashMap::new(),
tty: false,
interactive: false,
terminal_size: None,
};
let response = manager.start_process(request).await.unwrap();
assert!(!response.id.is_empty());
assert_eq!(response.status, ProcessStatus::Running);
assert!(!response.tty);
// Wait a bit for the process to complete
tokio::time::sleep(Duration::from_millis(200)).await;
// Check the process info
let info = manager.get_process(&response.id).await.unwrap();
assert_eq!(info.command, "echo");
assert!(!info.tty);
// List should have one more process
let list = manager.list_processes().await;
assert_eq!(list.processes.len(), initial_count + 1);
// Delete the process
manager.delete_process(&response.id).await.unwrap();
// List should be back to initial count
let list = manager.list_processes().await;
assert_eq!(list.processes.len(), initial_count);
}
#[cfg(unix)]
#[tokio::test]
async fn test_pty_process() {
let manager = ProcessManager::new();
let request = StartProcessRequest {
command: "sh".to_string(),
args: vec!["-c".to_string(), "echo hello && exit 0".to_string()],
cwd: None,
env: HashMap::new(),
tty: true,
interactive: true,
terminal_size: Some(TerminalSize { cols: 80, rows: 24 }),
};
let response = manager.start_process(request).await.unwrap();
assert!(response.tty);
assert!(response.interactive);
let info = manager.get_process(&response.id).await.unwrap();
assert!(info.tty);
assert!(info.terminal_size.is_some());
// Wait for process to complete
tokio::time::sleep(Duration::from_millis(500)).await;
// Cleanup
let _ = manager.delete_process(&response.id).await;
}
}

View file

@ -41,7 +41,9 @@ use crate::ui;
use crate::process_manager::{
ProcessManager, ProcessInfo, ProcessListResponse, ProcessLogPaths, ProcessStatus,
StartProcessRequest, StartProcessResponse, LogsQuery, LogsResponse,
ResizeTerminalRequest, TerminalSize, WriteInputRequest,
};
use crate::terminal::terminal_ws_handler;
use sandbox_agent_agent_management::agents::{
AgentError as ManagerError, AgentId, AgentManager, InstallOptions, SpawnOptions, StreamingSpawn,
};
@ -130,6 +132,14 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
.route("/process/:id/stop", post(stop_process))
.route("/process/:id/kill", post(kill_process))
.route("/process/:id/logs", get(get_process_logs))
// Terminal/PTY routes
.route("/process/:id/resize", post(resize_terminal))
.route("/process/:id/input", post(write_terminal_input))
.with_state(shared.clone());
// WebSocket routes (outside of auth middleware for easier client access)
let ws_router = Router::new()
.route("/v1/process/:id/terminal", get(terminal_ws))
.with_state(shared.clone());
if shared.auth.token.is_some() {
@ -141,7 +151,8 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
let mut router = Router::new()
.route("/", get(get_root))
.nest("/v1", v1_router);
.nest("/v1", v1_router)
.merge(ws_router); // Add WebSocket routes
if ui::is_enabled() {
router = router.merge(ui::router());
@ -177,7 +188,9 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
stop_process,
kill_process,
delete_process,
get_process_logs
get_process_logs,
resize_terminal,
write_terminal_input
),
components(
schemas(
@ -235,7 +248,10 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
StartProcessRequest,
StartProcessResponse,
LogsQuery,
LogsResponse
LogsResponse,
TerminalSize,
ResizeTerminalRequest,
WriteInputRequest
)
),
tags(
@ -4090,6 +4106,82 @@ async fn get_process_logs(
}
}
/// Resize a PTY terminal
#[utoipa::path(
post,
path = "/v1/process/{id}/resize",
tag = "process",
params(
("id" = String, Path, description = "Process ID")
),
request_body = ResizeTerminalRequest,
responses(
(status = 200, description = "Terminal resized successfully"),
(status = 400, description = "Process does not have a PTY"),
(status = 404, description = "Process not found")
)
)]
async fn resize_terminal(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(request): Json<ResizeTerminalRequest>,
) -> Result<StatusCode, ApiError> {
state
.process_manager
.resize_terminal(&id, request.cols, request.rows)
.await?;
Ok(StatusCode::OK)
}
/// Write data to a PTY terminal's input
#[utoipa::path(
post,
path = "/v1/process/{id}/input",
tag = "process",
params(
("id" = String, Path, description = "Process ID")
),
request_body = WriteInputRequest,
responses(
(status = 200, description = "Data written to terminal"),
(status = 400, description = "Process does not have a PTY or invalid data"),
(status = 404, description = "Process not found")
)
)]
async fn write_terminal_input(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(request): Json<WriteInputRequest>,
) -> Result<StatusCode, ApiError> {
let data = if request.base64 {
use base64::Engine;
base64::engine::general_purpose::STANDARD
.decode(&request.data)
.map_err(|e| SandboxError::InvalidRequest {
message: format!("Invalid base64 data: {}", e),
})?
} else {
request.data.into_bytes()
};
state
.process_manager
.write_terminal_input(&id, data)
.await?;
Ok(StatusCode::OK)
}
/// WebSocket endpoint for terminal I/O
async fn terminal_ws(
ws: axum::extract::ws::WebSocketUpgrade,
Path(id): Path<String>,
State(state): State<Arc<AppState>>,
) -> Result<Response, ApiError> {
terminal_ws_handler(ws, Path(id), State(state.process_manager.clone()))
.await
.map_err(|e| ApiError::Sandbox(e))
}
fn all_agents() -> [AgentId; 5] {
[
AgentId::Claude,

View file

@ -0,0 +1,215 @@
//! Terminal WebSocket handler for interactive PTY sessions.
//!
//! Provides bidirectional terminal I/O over WebSocket connections.
use std::sync::Arc;
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Path, State,
},
response::Response,
};
use futures::{SinkExt, StreamExt};
use tokio::sync::broadcast;
use crate::process_manager::{ProcessManager, TerminalMessage};
use sandbox_agent_error::SandboxError;
/// WebSocket upgrade handler for terminal connections
pub async fn terminal_ws_handler(
ws: WebSocketUpgrade,
Path(id): Path<String>,
State(process_manager): State<Arc<ProcessManager>>,
) -> Result<Response, SandboxError> {
// Verify the process exists and has PTY
let info = process_manager.get_process(&id).await?;
if !info.tty {
return Err(SandboxError::InvalidRequest {
message: "Process does not have a PTY allocated. Start with tty: true".to_string(),
});
}
// Check if process is still running
if info.exit_code.is_some() {
return Err(SandboxError::InvalidRequest {
message: "Process has already exited".to_string(),
});
}
Ok(ws.on_upgrade(move |socket| handle_terminal_socket(socket, id, process_manager)))
}
/// Handle the WebSocket connection for terminal I/O
async fn handle_terminal_socket(
socket: WebSocket,
process_id: String,
process_manager: Arc<ProcessManager>,
) {
let (mut ws_sender, mut ws_receiver) = socket.split();
// Get terminal output subscription and input sender
let output_rx = match process_manager.subscribe_terminal_output(&process_id).await {
Ok(rx) => rx,
Err(e) => {
let msg = TerminalMessage::Error {
message: format!("Failed to subscribe to terminal output: {}", e),
};
let _ = ws_sender
.send(Message::Text(serde_json::to_string(&msg).unwrap()))
.await;
return;
}
};
let input_tx = match process_manager.get_terminal_input_sender(&process_id).await {
Ok(tx) => tx,
Err(e) => {
let msg = TerminalMessage::Error {
message: format!("Failed to get terminal input channel: {}", e),
};
let _ = ws_sender
.send(Message::Text(serde_json::to_string(&msg).unwrap()))
.await;
return;
}
};
// Task to forward terminal output to WebSocket
let process_manager_clone = process_manager.clone();
let process_id_clone = process_id.clone();
let output_task = tokio::spawn(async move {
forward_output_to_ws(output_rx, ws_sender, process_manager_clone, process_id_clone).await;
});
// Handle input from WebSocket
let process_manager_clone = process_manager.clone();
let process_id_clone = process_id.clone();
while let Some(msg) = ws_receiver.next().await {
match msg {
Ok(Message::Text(text)) => {
if let Ok(terminal_msg) = serde_json::from_str::<TerminalMessage>(&text) {
match terminal_msg {
TerminalMessage::Input { data } => {
// Send input to terminal
if input_tx.send(data.into_bytes()).is_err() {
break;
}
}
TerminalMessage::Resize { cols, rows } => {
// Resize terminal
if let Err(e) = process_manager_clone
.resize_terminal(&process_id_clone, cols, rows)
.await
{
tracing::warn!("Failed to resize terminal: {}", e);
}
}
_ => {
// Ignore other message types from client
}
}
}
}
Ok(Message::Binary(data)) => {
// Binary data is treated as raw terminal input
if input_tx.send(data).is_err() {
break;
}
}
Ok(Message::Close(_)) => {
break;
}
Err(_) => {
break;
}
_ => {}
}
}
// Cancel output task
output_task.abort();
}
/// Forward terminal output to WebSocket
async fn forward_output_to_ws(
mut output_rx: broadcast::Receiver<Vec<u8>>,
mut ws_sender: futures::stream::SplitSink<WebSocket, Message>,
process_manager: Arc<ProcessManager>,
process_id: String,
) {
loop {
tokio::select! {
result = output_rx.recv() => {
match result {
Ok(data) => {
// Try to convert to UTF-8, otherwise send as binary
match String::from_utf8(data.clone()) {
Ok(text) => {
let msg = TerminalMessage::Data { data: text };
if ws_sender
.send(Message::Text(serde_json::to_string(&msg).unwrap()))
.await
.is_err()
{
break;
}
}
Err(_) => {
// Send as binary for non-UTF8 data
if ws_sender.send(Message::Binary(data)).await.is_err() {
break;
}
}
}
}
Err(broadcast::error::RecvError::Closed) => {
// Channel closed, process likely exited
break;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
// Missed some messages, continue
continue;
}
}
}
_ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
// Check if process is still running
if let Ok(info) = process_manager.get_process(&process_id).await {
if info.exit_code.is_some() {
// Send exit message
let msg = TerminalMessage::Exit { code: info.exit_code };
let _ = ws_sender
.send(Message::Text(serde_json::to_string(&msg).unwrap()))
.await;
break;
}
} else {
break;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_terminal_message_serialization() {
let msg = TerminalMessage::Data {
data: "hello".to_string(),
};
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"type\":\"data\""));
assert!(json.contains("\"data\":\"hello\""));
let msg = TerminalMessage::Resize { cols: 80, rows: 24 };
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains("\"type\":\"resize\""));
assert!(json.contains("\"cols\":80"));
assert!(json.contains("\"rows\":24"));
}
}