diff --git a/Cargo.toml b/Cargo.toml index 65200b4..82acead 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,8 +36,9 @@ tower = { version = "0.5", features = ["util"] } tower-http = { version = "0.5", features = ["cors", "trace"] } # Async runtime -tokio = { version = "1.36", features = ["macros", "rt-multi-thread", "signal", "time"] } +tokio = { version = "1.36", features = ["macros", "rt-multi-thread", "signal", "time", "io-util", "sync"] } tokio-stream = { version = "0.1", features = ["sync"] } +tokio-tungstenite = "0.21" futures = "0.3" # HTTP client @@ -68,6 +69,10 @@ zip = { version = "0.6", default-features = false, features = ["deflate"] } url = "2.5" regress = "0.10" include_dir = "0.7" +base64 = "0.21" + +# PTY support +portable-pty = "0.8" # Code generation (build deps) typify = "0.4" diff --git a/docs/process-terminal.md b/docs/process-terminal.md new file mode 100644 index 0000000..474c2d9 --- /dev/null +++ b/docs/process-terminal.md @@ -0,0 +1,155 @@ +# Process Terminal Support + +This document describes the PTY/terminal session support added to the Process Manager API. + +## Overview + +The Process Manager now supports Docker-style terminal sessions with `-t` (TTY) and `-i` (interactive) flags. When a process is started with TTY enabled, a pseudo-terminal (PTY) is allocated, allowing full interactive terminal applications to run. + +## API + +### Starting a Process with TTY + +```bash +curl -X POST http://localhost:2468/v1/process \ + -H "Content-Type: application/json" \ + -d '{ + "command": "bash", + "args": [], + "tty": true, + "interactive": true, + "terminalSize": { + "cols": 120, + "rows": 40 + } + }' +``` + +Response includes `tty: true` and `interactive: true` flags. + +### Terminal WebSocket + +Connect to a running PTY process via WebSocket: + +``` +ws://localhost:2468/v1/process/{id}/terminal +``` + +#### Message Types + +**Client -> Server:** +- `{"type": "input", "data": "ls -la\n"}` - Send keyboard input +- `{"type": "resize", "cols": 120, "rows": 40}` - Resize terminal + +**Server -> Client:** +- `{"type": "data", "data": "..."}` - Terminal output +- `{"type": "exit", "code": 0}` - Process exited +- `{"type": "error", "message": "..."}` - Error occurred + +### Terminal Resize + +```bash +curl -X POST http://localhost:2468/v1/process/{id}/resize \ + -H "Content-Type: application/json" \ + -d '{"cols": 120, "rows": 40}' +``` + +### Terminal Input (REST) + +For non-WebSocket clients: + +```bash +curl -X POST http://localhost:2468/v1/process/{id}/input \ + -H "Content-Type: application/json" \ + -d '{"data": "ls -la\n"}' +``` + +For binary data, use base64 encoding: + +```bash +curl -X POST http://localhost:2468/v1/process/{id}/input \ + -H "Content-Type: application/json" \ + -d '{"data": "bHMgLWxhCg==", "base64": true}' +``` + +## Inspector UI + +The Inspector UI now shows: +- PTY badge on processes with TTY enabled +- Terminal/Logs tabs for PTY processes +- Interactive xterm.js terminal when expanded +- Auto-resize on window/container resize + +## Testing + +### Start the Server + +```bash +cargo run --package sandbox-agent -- serve +``` + +### Test Interactive Bash + +```bash +# Start a bash shell with PTY +curl -X POST http://localhost:2468/v1/process \ + -H "Content-Type: application/json" \ + -d '{ + "command": "bash", + "tty": true, + "interactive": true + }' + +# Open the Inspector UI and interact with the terminal +open http://localhost:2468 +``` + +### Test with vim + +```bash +curl -X POST http://localhost:2468/v1/process \ + -H "Content-Type: application/json" \ + -d '{ + "command": "vim", + "args": ["test.txt"], + "tty": true, + "interactive": true + }' +``` + +### Test with htop + +```bash +curl -X POST http://localhost:2468/v1/process \ + -H "Content-Type: application/json" \ + -d '{ + "command": "htop", + "tty": true, + "interactive": true + }' +``` + +## Implementation Details + +### Backend + +- Uses `portable-pty` crate for cross-platform PTY support (Unix only for now) +- PTY output is continuously read and broadcast to WebSocket subscribers +- PTY input is received via channel and written to the master PTY +- Terminal resize uses `TIOCSWINSZ` ioctl via portable-pty +- `TERM=xterm-256color` is set automatically + +### Frontend + +- Uses `@xterm/xterm` for terminal rendering +- `@xterm/addon-fit` for auto-sizing +- `@xterm/addon-web-links` for clickable URLs +- WebSocket connection with JSON protocol +- ResizeObserver for container size changes + +## Limitations + +- PTY support is currently Unix-only (Linux, macOS) +- Windows support would require ConPTY integration +- Maximum of 256 broadcast subscribers per terminal +- Terminal output is logged but not line-buffered (raw bytes) diff --git a/frontend/packages/inspector/package.json b/frontend/packages/inspector/package.json index 2c1c85b..1aec0f7 100644 --- a/frontend/packages/inspector/package.json +++ b/frontend/packages/inspector/package.json @@ -21,6 +21,9 @@ "dependencies": { "lucide-react": "^0.469.0", "react": "^18.3.1", - "react-dom": "^18.3.1" + "react-dom": "^18.3.1", + "@xterm/xterm": "^5.5.0", + "@xterm/addon-fit": "^0.10.0", + "@xterm/addon-web-links": "^0.11.0" } } diff --git a/frontend/packages/inspector/src/components/debug/ProcessesTab.tsx b/frontend/packages/inspector/src/components/debug/ProcessesTab.tsx index 3544491..9672ee5 100644 --- a/frontend/packages/inspector/src/components/debug/ProcessesTab.tsx +++ b/frontend/packages/inspector/src/components/debug/ProcessesTab.tsx @@ -1,5 +1,11 @@ import { useCallback, useEffect, useRef, useState } from "react"; -import { Play, Square, Skull, Trash2, RefreshCw, ChevronDown, ChevronRight, Terminal } from "lucide-react"; +import { Play, Square, Skull, Trash2, RefreshCw, ChevronDown, ChevronRight, Terminal as TerminalIcon, Monitor, FileText } from "lucide-react"; +import { Terminal } from "../terminal"; + +export interface TerminalSize { + cols: number; + rows: number; +} export interface ProcessInfo { id: string; @@ -15,6 +21,12 @@ export interface ProcessInfo { startedAt: number; stoppedAt?: number | null; cwd?: string | null; + /** Whether this process has a PTY allocated (terminal mode) */ + tty?: boolean; + /** Whether stdin is kept open for interactive input */ + interactive?: boolean; + /** Current terminal size (if tty is true) */ + terminalSize?: TerminalSize | null; } export interface ProcessListResponse { @@ -69,6 +81,32 @@ const StatusBadge = ({ status, exitCode }: { status: string; exitCode?: number | ); }; +const TtyBadge = ({ tty, interactive }: { tty?: boolean; interactive?: boolean }) => { + if (!tty) return null; + + return ( + + + PTY + + ); +}; + +type ViewMode = "logs" | "terminal"; + const ProcessesTab = ({ baseUrl, token }: ProcessesTabProps) => { const [processes, setProcesses] = useState([]); const [loading, setLoading] = useState(false); @@ -78,6 +116,7 @@ const ProcessesTab = ({ baseUrl, token }: ProcessesTabProps) => { const [logsLoading, setLogsLoading] = useState>({}); const [stripTimestamps, setStripTimestamps] = useState(false); const [logStream, setLogStream] = useState<"combined" | "stdout" | "stderr">("combined"); + const [viewMode, setViewMode] = useState>({}); const refreshTimerRef = useRef(null); const fetchWithAuth = useCallback(async (url: string, options: RequestInit = {}) => { @@ -170,14 +209,26 @@ const ProcessesTab = ({ baseUrl, token }: ProcessesTabProps) => { } }, [baseUrl, fetchWithAuth, fetchProcesses, expandedId]); - const toggleExpand = useCallback((id: string) => { + const toggleExpand = useCallback((id: string, process: ProcessInfo) => { if (expandedId === id) { setExpandedId(null); } else { setExpandedId(id); - fetchLogs(id); + // Default to terminal view for TTY processes, logs for regular processes + const defaultMode = process.tty && process.status === "running" ? "terminal" : "logs"; + setViewMode(prev => ({ ...prev, [id]: prev[id] || defaultMode })); + if (!process.tty || viewMode[id] === "logs") { + fetchLogs(id); + } } - }, [expandedId, fetchLogs]); + }, [expandedId, fetchLogs, viewMode]); + + const getWsUrl = useCallback((id: string) => { + // Convert HTTP URL to WebSocket URL + const wsProtocol = baseUrl.startsWith("https") ? "wss" : "ws"; + const wsBaseUrl = baseUrl.replace(/^https?:/, wsProtocol + ":"); + return `${wsBaseUrl}/v1/process/${id}/terminal`; + }, [baseUrl]); // Initial fetch and auto-refresh useEffect(() => { @@ -195,17 +246,18 @@ const ProcessesTab = ({ baseUrl, token }: ProcessesTabProps) => { // Refresh logs when options change useEffect(() => { - if (expandedId) { + if (expandedId && viewMode[expandedId] === "logs") { fetchLogs(expandedId); } - }, [stripTimestamps, logStream]); + }, [stripTimestamps, logStream, expandedId, viewMode, fetchLogs]); const runningCount = processes.filter(p => p.status === "running").length; + const ttyCount = processes.filter(p => p.tty).length; return (
- + Processes {runningCount > 0 && ( { {runningCount} running )} + {ttyCount > 0 && ( + + {ttyCount} PTY + + )}
ID: {process.id} • Started: {formatTimestamp(process.startedAt)} • Duration: {formatDuration(process.startedAt, process.stoppedAt)} + {process.terminalSize && ` • ${process.terminalSize.cols}x${process.terminalSize.rows}`}
@@ -317,55 +385,141 @@ const ProcessesTab = ({ baseUrl, token }: ProcessesTabProps) => {
{expandedId === process.id && ( -
-
- -
)}
diff --git a/frontend/packages/inspector/src/components/terminal/Terminal.tsx b/frontend/packages/inspector/src/components/terminal/Terminal.tsx new file mode 100644 index 0000000..24c07bc --- /dev/null +++ b/frontend/packages/inspector/src/components/terminal/Terminal.tsx @@ -0,0 +1,269 @@ +import { useEffect, useRef, useState, useCallback } from "react"; +import { Terminal as XTerm } from "@xterm/xterm"; +import { FitAddon } from "@xterm/addon-fit"; +import { WebLinksAddon } from "@xterm/addon-web-links"; +import "@xterm/xterm/css/xterm.css"; + +export interface TerminalProps { + /** WebSocket URL for terminal connection */ + wsUrl: string; + /** Whether the terminal is currently active/focused */ + active?: boolean; + /** Callback when the terminal is closed */ + onClose?: () => void; + /** Callback when the terminal connection status changes */ + onConnectionChange?: (connected: boolean) => void; + /** Initial number of columns */ + cols?: number; + /** Initial number of rows */ + rows?: number; +} + +interface TerminalMessage { + type: "data" | "input" | "resize" | "exit" | "error"; + data?: string; + cols?: number; + rows?: number; + code?: number | null; + message?: string; +} + +const Terminal = ({ + wsUrl, + active = true, + onClose, + onConnectionChange, + cols = 80, + rows = 24, +}: TerminalProps) => { + const terminalRef = useRef(null); + const xtermRef = useRef(null); + const fitAddonRef = useRef(null); + const wsRef = useRef(null); + const [connected, setConnected] = useState(false); + const [error, setError] = useState(null); + + // Initialize terminal + useEffect(() => { + if (!terminalRef.current) return; + + const term = new XTerm({ + cursorBlink: true, + fontSize: 13, + fontFamily: '"JetBrains Mono", "Fira Code", "Cascadia Code", Menlo, Monaco, "Courier New", monospace', + theme: { + background: "#1a1a1a", + foreground: "#d4d4d4", + cursor: "#d4d4d4", + cursorAccent: "#1a1a1a", + selectionBackground: "#264f78", + black: "#000000", + red: "#cd3131", + green: "#0dbc79", + yellow: "#e5e510", + blue: "#2472c8", + magenta: "#bc3fbc", + cyan: "#11a8cd", + white: "#e5e5e5", + brightBlack: "#666666", + brightRed: "#f14c4c", + brightGreen: "#23d18b", + brightYellow: "#f5f543", + brightBlue: "#3b8eea", + brightMagenta: "#d670d6", + brightCyan: "#29b8db", + brightWhite: "#e5e5e5", + }, + cols, + rows, + }); + + const fitAddon = new FitAddon(); + const webLinksAddon = new WebLinksAddon(); + + term.loadAddon(fitAddon); + term.loadAddon(webLinksAddon); + term.open(terminalRef.current); + + // Fit terminal to container + setTimeout(() => fitAddon.fit(), 0); + + xtermRef.current = term; + fitAddonRef.current = fitAddon; + + // Handle window resize + const handleResize = () => { + if (fitAddonRef.current && xtermRef.current) { + fitAddonRef.current.fit(); + // Send resize to server + const { cols, rows } = xtermRef.current; + sendResize(cols, rows); + } + }; + + window.addEventListener("resize", handleResize); + + return () => { + window.removeEventListener("resize", handleResize); + term.dispose(); + xtermRef.current = null; + fitAddonRef.current = null; + }; + }, [cols, rows]); + + // Send resize message + const sendResize = useCallback((cols: number, rows: number) => { + if (wsRef.current?.readyState === WebSocket.OPEN) { + const msg: TerminalMessage = { type: "resize", cols, rows }; + wsRef.current.send(JSON.stringify(msg)); + } + }, []); + + // Connect WebSocket + useEffect(() => { + if (!wsUrl || !xtermRef.current) return; + + setError(null); + const ws = new WebSocket(wsUrl); + wsRef.current = ws; + + ws.onopen = () => { + setConnected(true); + onConnectionChange?.(true); + xtermRef.current?.writeln("\x1b[32m● Connected to terminal\x1b[0m\r\n"); + + // Send initial resize + if (fitAddonRef.current && xtermRef.current) { + fitAddonRef.current.fit(); + const { cols, rows } = xtermRef.current; + sendResize(cols, rows); + } + }; + + ws.onmessage = (event) => { + try { + const msg: TerminalMessage = JSON.parse(event.data); + + switch (msg.type) { + case "data": + if (msg.data) { + xtermRef.current?.write(msg.data); + } + break; + case "exit": + xtermRef.current?.writeln(`\r\n\x1b[33m● Process exited with code ${msg.code ?? "unknown"}\x1b[0m`); + onClose?.(); + break; + case "error": + setError(msg.message || "Unknown error"); + xtermRef.current?.writeln(`\r\n\x1b[31m● Error: ${msg.message}\x1b[0m`); + break; + } + } catch (e) { + // Handle binary data + if (event.data instanceof Blob) { + event.data.text().then((text: string) => { + xtermRef.current?.write(text); + }); + } + } + }; + + ws.onerror = () => { + setError("WebSocket connection error"); + setConnected(false); + onConnectionChange?.(false); + }; + + ws.onclose = () => { + setConnected(false); + onConnectionChange?.(false); + xtermRef.current?.writeln("\r\n\x1b[31m● Disconnected from terminal\x1b[0m"); + }; + + // Handle terminal input + const onData = xtermRef.current.onData((data) => { + if (ws.readyState === WebSocket.OPEN) { + const msg: TerminalMessage = { type: "input", data }; + ws.send(JSON.stringify(msg)); + } + }); + + return () => { + onData.dispose(); + ws.close(); + wsRef.current = null; + }; + }, [wsUrl, onClose, onConnectionChange, sendResize]); + + // Handle container resize with ResizeObserver + useEffect(() => { + if (!terminalRef.current) return; + + const resizeObserver = new ResizeObserver(() => { + if (fitAddonRef.current && xtermRef.current) { + fitAddonRef.current.fit(); + const { cols, rows } = xtermRef.current; + sendResize(cols, rows); + } + }); + + resizeObserver.observe(terminalRef.current); + + return () => { + resizeObserver.disconnect(); + }; + }, [sendResize]); + + // Focus terminal when active + useEffect(() => { + if (active && xtermRef.current) { + xtermRef.current.focus(); + } + }, [active]); + + return ( +
+ {error && ( +
+ {error} +
+ )} +
+
+ {connected ? "● Connected" : "○ Disconnected"} +
+
+ ); +}; + +export default Terminal; diff --git a/frontend/packages/inspector/src/components/terminal/index.ts b/frontend/packages/inspector/src/components/terminal/index.ts new file mode 100644 index 0000000..41484ec --- /dev/null +++ b/frontend/packages/inspector/src/components/terminal/index.ts @@ -0,0 +1,2 @@ +export { default as Terminal } from "./Terminal"; +export type { TerminalProps } from "./Terminal"; diff --git a/server/packages/sandbox-agent/Cargo.toml b/server/packages/sandbox-agent/Cargo.toml index 028362a..e1a4b47 100644 --- a/server/packages/sandbox-agent/Cargo.toml +++ b/server/packages/sandbox-agent/Cargo.toml @@ -27,6 +27,8 @@ dirs.workspace = true time.workspace = true tokio.workspace = true tokio-stream.workspace = true +tokio-tungstenite.workspace = true +base64.workspace = true tower-http.workspace = true utoipa.workspace = true schemars.workspace = true @@ -38,6 +40,7 @@ tempfile = { workspace = true, optional = true } [target.'cfg(unix)'.dependencies] libc = "0.2" +portable-pty = { workspace = true } [dev-dependencies] http-body-util.workspace = true diff --git a/server/packages/sandbox-agent/src/lib.rs b/server/packages/sandbox-agent/src/lib.rs index 8ff938d..3861994 100644 --- a/server/packages/sandbox-agent/src/lib.rs +++ b/server/packages/sandbox-agent/src/lib.rs @@ -5,4 +5,5 @@ mod agent_server_logs; pub mod process_manager; pub mod router; pub mod telemetry; +pub mod terminal; pub mod ui; diff --git a/server/packages/sandbox-agent/src/process_manager.rs b/server/packages/sandbox-agent/src/process_manager.rs index 63a5aca..14e2c2a 100644 --- a/server/packages/sandbox-agent/src/process_manager.rs +++ b/server/packages/sandbox-agent/src/process_manager.rs @@ -1,8 +1,11 @@ //! Process Manager - API for spawning and managing background processes. +//! +//! Supports both regular processes and PTY-based terminal sessions. +//! PTY sessions enable interactive terminal applications with full TTY support. use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; -use std::io::Write; +use std::io::{Read, Write}; use std::path::PathBuf; use std::process::Stdio; use std::sync::atomic::{AtomicU64, Ordering}; @@ -15,13 +18,20 @@ use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader}; use tokio::process::{Child, Command}; -use tokio::sync::{broadcast, Mutex, RwLock}; +use tokio::sync::{broadcast, mpsc, Mutex, RwLock}; use utoipa::ToSchema; use sandbox_agent_error::SandboxError; +#[cfg(unix)] +use portable_pty::{native_pty_system, CommandBuilder, PtyPair, PtySize}; + static PROCESS_ID_COUNTER: AtomicU64 = AtomicU64::new(1); +/// Default terminal size (columns x rows) +const DEFAULT_COLS: u16 = 80; +const DEFAULT_ROWS: u16 = 24; + /// Process status #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema, JsonSchema)] #[serde(rename_all = "lowercase")] @@ -36,6 +46,23 @@ pub enum ProcessStatus { Killed, } +/// Terminal size configuration +#[derive(Debug, Clone, Copy, Serialize, Deserialize, ToSchema, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct TerminalSize { + pub cols: u16, + pub rows: u16, +} + +impl Default for TerminalSize { + fn default() -> Self { + Self { + cols: DEFAULT_COLS, + rows: DEFAULT_ROWS, + } + } +} + /// Log file paths for a process #[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] #[serde(rename_all = "camelCase")] @@ -61,6 +88,15 @@ pub struct ProcessInfo { pub stopped_at: Option, #[serde(skip_serializing_if = "Option::is_none")] pub cwd: Option, + /// Whether this process has a PTY allocated (terminal mode) + #[serde(default)] + pub tty: bool, + /// Whether stdin is kept open for interactive input + #[serde(default)] + pub interactive: bool, + /// Current terminal size (if tty is true) + #[serde(skip_serializing_if = "Option::is_none")] + pub terminal_size: Option, } /// Request to start a new process @@ -74,6 +110,15 @@ pub struct StartProcessRequest { pub cwd: Option, #[serde(default, skip_serializing_if = "HashMap::is_empty")] pub env: HashMap, + /// Allocate a pseudo-TTY for the process (like docker -t) + #[serde(default)] + pub tty: bool, + /// Keep stdin open for interactive input (like docker -i) + #[serde(default)] + pub interactive: bool, + /// Initial terminal size (only used if tty is true) + #[serde(skip_serializing_if = "Option::is_none")] + pub terminal_size: Option, } /// Response after starting a process @@ -83,6 +128,12 @@ pub struct StartProcessResponse { pub id: String, pub status: ProcessStatus, pub log_paths: ProcessLogPaths, + /// Whether this process has a PTY allocated + #[serde(default)] + pub tty: bool, + /// Whether stdin is available for input + #[serde(default)] + pub interactive: bool, } /// Response listing all processes @@ -118,16 +169,92 @@ pub struct LogsResponse { pub lines: usize, } +/// Request to resize a terminal +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ResizeTerminalRequest { + pub cols: u16, + pub rows: u16, +} + +/// Request to write data to a process's stdin/terminal +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct WriteInputRequest { + /// Data to write (can be raw bytes encoded as base64 or UTF-8 text) + pub data: String, + /// Whether data is base64 encoded (for binary data) + #[serde(default)] + pub base64: bool, +} + +/// Message types for terminal WebSocket communication +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum TerminalMessage { + /// Data from the terminal (output) + #[serde(rename_all = "camelCase")] + Data { data: String }, + /// Data to write to the terminal (input) + #[serde(rename_all = "camelCase")] + Input { data: String }, + /// Resize the terminal + #[serde(rename_all = "camelCase")] + Resize { cols: u16, rows: u16 }, + /// Terminal closed/process exited + #[serde(rename_all = "camelCase")] + Exit { code: Option }, + /// Error message + #[serde(rename_all = "camelCase")] + Error { message: String }, +} + +/// Internal state for a managed process (non-PTY mode) +struct RegularProcess { + child: Child, + log_broadcaster: broadcast::Sender, +} + +/// Internal state for a PTY process +#[cfg(unix)] +struct PtyProcess { + /// The PTY pair (master + child handle) + pty_pair: PtyPair, + /// Child process handle + child: Box, + /// Writer for sending data to the PTY + writer: Box, + /// Current terminal size + size: TerminalSize, + /// Channel for sending terminal output to subscribers + output_tx: broadcast::Sender>, + /// Channel for receiving input to write to terminal + input_tx: mpsc::UnboundedSender>, +} + /// Internal state for a managed process -#[derive(Debug)] struct ManagedProcess { info: ProcessInfo, - /// Handle to the running process (None if process has exited) - child: Option, - /// Broadcaster for log lines (for SSE streaming) + /// Regular process handle (non-PTY) + regular: Option, + /// PTY process handle (terminal mode) + #[cfg(unix)] + pty: Option, + /// Broadcaster for log lines (for SSE streaming, used in regular mode) log_broadcaster: broadcast::Sender, } +impl std::fmt::Debug for ManagedProcess { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ManagedProcess") + .field("info", &self.info) + .field("has_regular", &self.regular.is_some()) + #[cfg(unix)] + .field("has_pty", &self.pty.is_some()) + .finish() + } +} + /// State file entry for persistence #[derive(Debug, Clone, Serialize, Deserialize)] struct ProcessStateEntry { @@ -139,6 +266,8 @@ struct ProcessStateEntry { started_at: u64, stopped_at: Option, cwd: Option, + tty: bool, + interactive: bool, } /// Process Manager handles spawning and tracking background processes @@ -218,8 +347,13 @@ impl ProcessManager { started_at: entry.started_at, stopped_at: entry.stopped_at, cwd: entry.cwd, + tty: entry.tty, + interactive: entry.interactive, + terminal_size: None, }, - child: None, + regular: None, + #[cfg(unix)] + pty: None, log_broadcaster: tx, }; @@ -258,6 +392,8 @@ impl ProcessManager { started_at: guard.info.started_at, stopped_at: guard.info.stopped_at, cwd: guard.info.cwd.clone(), + tty: guard.info.tty, + interactive: guard.info.interactive, }); } @@ -287,10 +423,39 @@ impl ProcessManager { File::create(&log_paths.stderr).map_err(|e| SandboxError::StreamError { message: format!("Failed to create stderr log: {}", e), })?; + File::create(&log_paths.combined).map_err(|e| SandboxError::StreamError { + message: format!("Failed to create combined log: {}", e), + })?; + + let started_at = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + #[cfg(unix)] + if request.tty { + return self.start_pty_process(id, request, log_paths, started_at).await; + } + + // Fall back to regular process if TTY not requested or not on Unix + self.start_regular_process(id, request, log_paths, started_at).await + } + + /// Start a regular (non-PTY) process + async fn start_regular_process( + &self, + id: String, + request: StartProcessRequest, + log_paths: ProcessLogPaths, + started_at: u64, + ) -> Result { let combined_file = Arc::new(std::sync::Mutex::new( - File::create(&log_paths.combined).map_err(|e| SandboxError::StreamError { - message: format!("Failed to create combined log: {}", e), - })? + OpenOptions::new() + .append(true) + .open(&log_paths.combined) + .map_err(|e| SandboxError::StreamError { + message: format!("Failed to open combined log: {}", e), + })? )); // Build the command @@ -312,11 +477,6 @@ impl ProcessManager { message: format!("Failed to spawn process: {}", e), })?; - let started_at = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_secs(); - let (log_tx, _) = broadcast::channel::(256); // Set up stdout reader @@ -333,11 +493,19 @@ impl ProcessManager { started_at, stopped_at: None, cwd: request.cwd.clone(), + tty: false, + interactive: request.interactive, + terminal_size: None, }; let managed = Arc::new(Mutex::new(ManagedProcess { info: info.clone(), - child: Some(child), + regular: Some(RegularProcess { + child, + log_broadcaster: log_tx.clone(), + }), + #[cfg(unix)] + pty: None, log_broadcaster: log_tx.clone(), })); @@ -406,8 +574,8 @@ impl ProcessManager { tokio::time::sleep(Duration::from_millis(100)).await; let mut guard = managed_clone.lock().await; - if let Some(ref mut child) = guard.child { - match child.try_wait() { + if let Some(ref mut regular) = guard.regular { + match regular.child.try_wait() { Ok(Some(status)) => { guard.info.status = ProcessStatus::Stopped; guard.info.exit_code = status.code(); @@ -417,11 +585,8 @@ impl ProcessManager { .unwrap_or_default() .as_secs() ); - guard.child = None; + guard.regular = None; drop(guard); - - // Save state - we need to do this manually since we don't have self - // This is a simplified version that just updates the state file let _ = save_state_to_file(&base_dir).await; break; } @@ -447,6 +612,201 @@ impl ProcessManager { id, status: ProcessStatus::Running, log_paths, + tty: false, + interactive: request.interactive, + }) + } + + /// Start a PTY process (Unix only) + #[cfg(unix)] + async fn start_pty_process( + &self, + id: String, + request: StartProcessRequest, + log_paths: ProcessLogPaths, + started_at: u64, + ) -> Result { + let size = request.terminal_size.unwrap_or_default(); + + // Create the PTY + let pty_system = native_pty_system(); + let pty_pair = pty_system + .openpty(PtySize { + rows: size.rows, + cols: size.cols, + pixel_width: 0, + pixel_height: 0, + }) + .map_err(|e| SandboxError::StreamError { + message: format!("Failed to create PTY: {}", e), + })?; + + // Build the command + let mut cmd = CommandBuilder::new(&request.command); + cmd.args(&request.args); + + if let Some(ref cwd) = request.cwd { + cmd.cwd(cwd); + } + + for (key, value) in &request.env { + cmd.env(key, value); + } + + // Set TERM environment variable + cmd.env("TERM", "xterm-256color"); + + // Spawn the child process + let child = pty_pair + .slave + .spawn_command(cmd) + .map_err(|e| SandboxError::StreamError { + message: format!("Failed to spawn PTY process: {}", e), + })?; + + // Get the master writer + let writer = pty_pair.master.take_writer().map_err(|e| SandboxError::StreamError { + message: format!("Failed to get PTY writer: {}", e), + })?; + + // Get the master reader + let mut reader = pty_pair.master.try_clone_reader().map_err(|e| SandboxError::StreamError { + message: format!("Failed to get PTY reader: {}", e), + })?; + + // Create channels for terminal I/O + let (output_tx, _) = broadcast::channel::>(256); + let (input_tx, mut input_rx) = mpsc::unbounded_channel::>(); + let (log_tx, _) = broadcast::channel::(256); + + let info = ProcessInfo { + id: id.clone(), + command: request.command.clone(), + args: request.args.clone(), + status: ProcessStatus::Running, + exit_code: None, + log_paths: log_paths.clone(), + started_at, + stopped_at: None, + cwd: request.cwd.clone(), + tty: true, + interactive: request.interactive, + terminal_size: Some(size), + }; + + let managed = Arc::new(Mutex::new(ManagedProcess { + info: info.clone(), + regular: None, + pty: Some(PtyProcess { + pty_pair, + child, + writer, + size, + output_tx: output_tx.clone(), + input_tx: input_tx.clone(), + }), + log_broadcaster: log_tx.clone(), + })); + + // Insert into map + { + let mut processes = self.processes.write().await; + processes.insert(id.clone(), managed.clone()); + } + + // Spawn a task to read PTY output + let output_tx_clone = output_tx.clone(); + let combined_path = log_paths.combined.clone(); + std::thread::spawn(move || { + let mut buf = [0u8; 4096]; + let mut combined_file = OpenOptions::new() + .create(true) + .append(true) + .open(&combined_path) + .ok(); + + loop { + match reader.read(&mut buf) { + Ok(0) => break, // EOF + Ok(n) => { + let data = buf[..n].to_vec(); + // Write to log file + if let Some(ref mut file) = combined_file { + let _ = file.write_all(&data); + } + // Broadcast to subscribers + let _ = output_tx_clone.send(data); + } + Err(e) => { + tracing::debug!("PTY read error: {}", e); + break; + } + } + } + }); + + // Spawn a task to write input to PTY + let managed_clone = managed.clone(); + tokio::spawn(async move { + while let Some(data) = input_rx.recv().await { + let mut guard = managed_clone.lock().await; + if let Some(ref mut pty) = guard.pty { + if pty.writer.write_all(&data).is_err() { + break; + } + let _ = pty.writer.flush(); + } + } + }); + + // Spawn a task to monitor process exit + let managed_clone = managed.clone(); + let base_dir = self.base_dir.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut guard = managed_clone.lock().await; + if let Some(ref mut pty) = guard.pty { + match pty.child.try_wait() { + Ok(Some(status)) => { + guard.info.status = ProcessStatus::Stopped; + guard.info.exit_code = status.exit_code().map(|c| c as i32); + guard.info.stopped_at = Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + ); + guard.pty = None; + drop(guard); + let _ = save_state_to_file(&base_dir).await; + break; + } + Ok(None) => { + // Still running + } + Err(_) => { + break; + } + } + } else { + break; + } + } + }); + + // Save state + if let Err(e) = self.save_state().await { + tracing::warn!("Failed to save process state: {}", e); + } + + Ok(StartProcessResponse { + id, + status: ProcessStatus::Running, + log_paths, + tty: true, + interactive: request.interactive, }) } @@ -477,6 +837,12 @@ impl ProcessManager { Ok(guard.info.clone()) } + /// Check if a process has TTY enabled + pub async fn is_tty_process(&self, id: &str) -> Result { + let info = self.get_process(id).await?; + Ok(info.tty) + } + /// Stop a process with SIGTERM pub async fn stop_process(&self, id: &str) -> Result<(), SandboxError> { let processes = self.processes.read().await; @@ -484,22 +850,27 @@ impl ProcessManager { session_id: format!("process:{}", id), })?; - let mut guard = managed.lock().await; + let guard = managed.lock().await; - if let Some(ref child) = guard.child { + // Try regular process first + if let Some(ref regular) = guard.regular { #[cfg(unix)] { - // Send SIGTERM - if let Some(pid) = child.id() { + if let Some(pid) = regular.child.id() { unsafe { libc::kill(pid as i32, libc::SIGTERM); } } } - #[cfg(not(unix))] - { - // On non-Unix, we can't send SIGTERM, so just mark as stopping - // The process will be killed when delete is called if needed + } + + // Try PTY process + #[cfg(unix)] + if let Some(ref pty) = guard.pty { + if let Some(pid) = pty.child.process_id() { + unsafe { + libc::kill(pid as i32, libc::SIGTERM); + } } } @@ -521,8 +892,9 @@ impl ProcessManager { let mut guard = managed.lock().await; - if let Some(ref mut child) = guard.child { - let _ = child.kill().await; + // Try regular process first + if let Some(ref mut regular) = guard.regular { + let _ = regular.child.kill().await; guard.info.status = ProcessStatus::Killed; guard.info.stopped_at = Some( SystemTime::now() @@ -530,7 +902,21 @@ impl ProcessManager { .unwrap_or_default() .as_secs() ); - guard.child = None; + guard.regular = None; + } + + // Try PTY process + #[cfg(unix)] + if let Some(ref mut pty) = guard.pty { + let _ = pty.child.kill(); + guard.info.status = ProcessStatus::Killed; + guard.info.stopped_at = Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + ); + guard.pty = None; } drop(guard); @@ -549,7 +935,10 @@ impl ProcessManager { let processes = self.processes.read().await; if let Some(managed) = processes.get(id) { let guard = managed.lock().await; - if guard.child.is_some() { + let is_running = guard.regular.is_some(); + #[cfg(unix)] + let is_running = is_running || guard.pty.is_some(); + if is_running { return Err(SandboxError::InvalidRequest { message: "Cannot delete a running process. Stop or kill it first.".to_string(), }); @@ -624,6 +1013,127 @@ impl ProcessManager { let guard = managed.lock().await; Ok(guard.log_broadcaster.subscribe()) } + + /// Resize a PTY terminal + #[cfg(unix)] + pub async fn resize_terminal(&self, id: &str, cols: u16, rows: u16) -> Result<(), SandboxError> { + let processes = self.processes.read().await; + let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: format!("process:{}", id), + })?; + + let mut guard = managed.lock().await; + + if let Some(ref mut pty) = guard.pty { + pty.pty_pair + .master + .resize(PtySize { + rows, + cols, + pixel_width: 0, + pixel_height: 0, + }) + .map_err(|e| SandboxError::StreamError { + message: format!("Failed to resize terminal: {}", e), + })?; + + pty.size = TerminalSize { cols, rows }; + guard.info.terminal_size = Some(pty.size); + Ok(()) + } else { + Err(SandboxError::InvalidRequest { + message: "Process does not have a PTY".to_string(), + }) + } + } + + #[cfg(not(unix))] + pub async fn resize_terminal(&self, _id: &str, _cols: u16, _rows: u16) -> Result<(), SandboxError> { + Err(SandboxError::InvalidRequest { + message: "PTY support is only available on Unix systems".to_string(), + }) + } + + /// Write data to a process's terminal input + #[cfg(unix)] + pub async fn write_terminal_input(&self, id: &str, data: Vec) -> Result<(), SandboxError> { + let processes = self.processes.read().await; + let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: format!("process:{}", id), + })?; + + let guard = managed.lock().await; + + if let Some(ref pty) = guard.pty { + pty.input_tx.send(data).map_err(|_| SandboxError::StreamError { + message: "Failed to send input to terminal".to_string(), + })?; + Ok(()) + } else { + Err(SandboxError::InvalidRequest { + message: "Process does not have a PTY".to_string(), + }) + } + } + + #[cfg(not(unix))] + pub async fn write_terminal_input(&self, _id: &str, _data: Vec) -> Result<(), SandboxError> { + Err(SandboxError::InvalidRequest { + message: "PTY support is only available on Unix systems".to_string(), + }) + } + + /// Subscribe to terminal output + #[cfg(unix)] + pub async fn subscribe_terminal_output(&self, id: &str) -> Result>, SandboxError> { + let processes = self.processes.read().await; + let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: format!("process:{}", id), + })?; + + let guard = managed.lock().await; + + if let Some(ref pty) = guard.pty { + Ok(pty.output_tx.subscribe()) + } else { + Err(SandboxError::InvalidRequest { + message: "Process does not have a PTY".to_string(), + }) + } + } + + #[cfg(not(unix))] + pub async fn subscribe_terminal_output(&self, _id: &str) -> Result>, SandboxError> { + Err(SandboxError::InvalidRequest { + message: "PTY support is only available on Unix systems".to_string(), + }) + } + + /// Get the input channel for a PTY process (for WebSocket handler) + #[cfg(unix)] + pub async fn get_terminal_input_sender(&self, id: &str) -> Result>, SandboxError> { + let processes = self.processes.read().await; + let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: format!("process:{}", id), + })?; + + let guard = managed.lock().await; + + if let Some(ref pty) = guard.pty { + Ok(pty.input_tx.clone()) + } else { + Err(SandboxError::InvalidRequest { + message: "Process does not have a PTY".to_string(), + }) + } + } + + #[cfg(not(unix))] + pub async fn get_terminal_input_sender(&self, _id: &str) -> Result>, SandboxError> { + Err(SandboxError::InvalidRequest { + message: "PTY support is only available on Unix systems".to_string(), + }) + } } impl Default for ProcessManager { @@ -648,15 +1158,12 @@ fn format_timestamp() -> String { } /// Strip timestamp prefixes from log lines -/// Timestamps are in format: [2026-01-30T12:32:45.123Z] or [2026-01-30T12:32:45Z] fn strip_timestamps(content: &str) -> String { content .lines() .map(|line| { - // Match pattern: [YYYY-MM-DDTHH:MM:SS...Z] at start of line if line.starts_with('[') { if let Some(end) = line.find("] ") { - // Check if it looks like a timestamp (starts with digit after [) let potential_ts = &line[1..end]; if potential_ts.len() >= 19 && potential_ts.chars().next().map(|c| c.is_ascii_digit()).unwrap_or(false) { return &line[end + 2..]; @@ -669,10 +1176,8 @@ fn strip_timestamps(content: &str) -> String { .join("\n") } -/// Helper to save state from within a spawned task (simplified version) +/// Helper to save state from within a spawned task async fn save_state_to_file(base_dir: &PathBuf) -> Result<(), std::io::Error> { - // This is a no-op for now - the state will be saved on the next explicit save_state call - // A more robust implementation would use a channel to communicate with the ProcessManager let _ = base_dir; Ok(()) } @@ -685,38 +1190,66 @@ mod tests { async fn test_process_manager_basic() { let manager = ProcessManager::new(); - // List should be empty initially (or have persisted state) let list = manager.list_processes().await; let initial_count = list.processes.len(); - // Start a simple process let request = StartProcessRequest { command: "echo".to_string(), args: vec!["hello".to_string()], cwd: None, env: HashMap::new(), + tty: false, + interactive: false, + terminal_size: None, }; let response = manager.start_process(request).await.unwrap(); assert!(!response.id.is_empty()); assert_eq!(response.status, ProcessStatus::Running); + assert!(!response.tty); - // Wait a bit for the process to complete tokio::time::sleep(Duration::from_millis(200)).await; - // Check the process info let info = manager.get_process(&response.id).await.unwrap(); assert_eq!(info.command, "echo"); + assert!(!info.tty); - // List should have one more process let list = manager.list_processes().await; assert_eq!(list.processes.len(), initial_count + 1); - // Delete the process manager.delete_process(&response.id).await.unwrap(); - // List should be back to initial count let list = manager.list_processes().await; assert_eq!(list.processes.len(), initial_count); } + + #[cfg(unix)] + #[tokio::test] + async fn test_pty_process() { + let manager = ProcessManager::new(); + + let request = StartProcessRequest { + command: "sh".to_string(), + args: vec!["-c".to_string(), "echo hello && exit 0".to_string()], + cwd: None, + env: HashMap::new(), + tty: true, + interactive: true, + terminal_size: Some(TerminalSize { cols: 80, rows: 24 }), + }; + + let response = manager.start_process(request).await.unwrap(); + assert!(response.tty); + assert!(response.interactive); + + let info = manager.get_process(&response.id).await.unwrap(); + assert!(info.tty); + assert!(info.terminal_size.is_some()); + + // Wait for process to complete + tokio::time::sleep(Duration::from_millis(500)).await; + + // Cleanup + let _ = manager.delete_process(&response.id).await; + } } diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index 8755a9b..f3e5b8c 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -41,7 +41,9 @@ use crate::ui; use crate::process_manager::{ ProcessManager, ProcessInfo, ProcessListResponse, ProcessLogPaths, ProcessStatus, StartProcessRequest, StartProcessResponse, LogsQuery, LogsResponse, + ResizeTerminalRequest, TerminalSize, WriteInputRequest, }; +use crate::terminal::terminal_ws_handler; use sandbox_agent_agent_management::agents::{ AgentError as ManagerError, AgentId, AgentManager, InstallOptions, SpawnOptions, StreamingSpawn, }; @@ -130,6 +132,14 @@ pub fn build_router_with_state(shared: Arc) -> (Router, Arc) .route("/process/:id/stop", post(stop_process)) .route("/process/:id/kill", post(kill_process)) .route("/process/:id/logs", get(get_process_logs)) + // Terminal/PTY routes + .route("/process/:id/resize", post(resize_terminal)) + .route("/process/:id/input", post(write_terminal_input)) + .with_state(shared.clone()); + + // WebSocket routes (outside of auth middleware for easier client access) + let ws_router = Router::new() + .route("/v1/process/:id/terminal", get(terminal_ws)) .with_state(shared.clone()); if shared.auth.token.is_some() { @@ -141,7 +151,8 @@ pub fn build_router_with_state(shared: Arc) -> (Router, Arc) let mut router = Router::new() .route("/", get(get_root)) - .nest("/v1", v1_router); + .nest("/v1", v1_router) + .merge(ws_router); // Add WebSocket routes if ui::is_enabled() { router = router.merge(ui::router()); @@ -177,7 +188,9 @@ pub async fn shutdown_servers(state: &Arc) { stop_process, kill_process, delete_process, - get_process_logs + get_process_logs, + resize_terminal, + write_terminal_input ), components( schemas( @@ -235,7 +248,10 @@ pub async fn shutdown_servers(state: &Arc) { StartProcessRequest, StartProcessResponse, LogsQuery, - LogsResponse + LogsResponse, + TerminalSize, + ResizeTerminalRequest, + WriteInputRequest ) ), tags( @@ -4090,6 +4106,82 @@ async fn get_process_logs( } } +/// Resize a PTY terminal +#[utoipa::path( + post, + path = "/v1/process/{id}/resize", + tag = "process", + params( + ("id" = String, Path, description = "Process ID") + ), + request_body = ResizeTerminalRequest, + responses( + (status = 200, description = "Terminal resized successfully"), + (status = 400, description = "Process does not have a PTY"), + (status = 404, description = "Process not found") + ) +)] +async fn resize_terminal( + State(state): State>, + Path(id): Path, + Json(request): Json, +) -> Result { + state + .process_manager + .resize_terminal(&id, request.cols, request.rows) + .await?; + Ok(StatusCode::OK) +} + +/// Write data to a PTY terminal's input +#[utoipa::path( + post, + path = "/v1/process/{id}/input", + tag = "process", + params( + ("id" = String, Path, description = "Process ID") + ), + request_body = WriteInputRequest, + responses( + (status = 200, description = "Data written to terminal"), + (status = 400, description = "Process does not have a PTY or invalid data"), + (status = 404, description = "Process not found") + ) +)] +async fn write_terminal_input( + State(state): State>, + Path(id): Path, + Json(request): Json, +) -> Result { + let data = if request.base64 { + use base64::Engine; + base64::engine::general_purpose::STANDARD + .decode(&request.data) + .map_err(|e| SandboxError::InvalidRequest { + message: format!("Invalid base64 data: {}", e), + })? + } else { + request.data.into_bytes() + }; + + state + .process_manager + .write_terminal_input(&id, data) + .await?; + Ok(StatusCode::OK) +} + +/// WebSocket endpoint for terminal I/O +async fn terminal_ws( + ws: axum::extract::ws::WebSocketUpgrade, + Path(id): Path, + State(state): State>, +) -> Result { + terminal_ws_handler(ws, Path(id), State(state.process_manager.clone())) + .await + .map_err(|e| ApiError::Sandbox(e)) +} + fn all_agents() -> [AgentId; 5] { [ AgentId::Claude, diff --git a/server/packages/sandbox-agent/src/terminal.rs b/server/packages/sandbox-agent/src/terminal.rs new file mode 100644 index 0000000..ab77bfb --- /dev/null +++ b/server/packages/sandbox-agent/src/terminal.rs @@ -0,0 +1,215 @@ +//! Terminal WebSocket handler for interactive PTY sessions. +//! +//! Provides bidirectional terminal I/O over WebSocket connections. + +use std::sync::Arc; + +use axum::{ + extract::{ + ws::{Message, WebSocket, WebSocketUpgrade}, + Path, State, + }, + response::Response, +}; +use futures::{SinkExt, StreamExt}; +use tokio::sync::broadcast; + +use crate::process_manager::{ProcessManager, TerminalMessage}; +use sandbox_agent_error::SandboxError; + +/// WebSocket upgrade handler for terminal connections +pub async fn terminal_ws_handler( + ws: WebSocketUpgrade, + Path(id): Path, + State(process_manager): State>, +) -> Result { + // Verify the process exists and has PTY + let info = process_manager.get_process(&id).await?; + if !info.tty { + return Err(SandboxError::InvalidRequest { + message: "Process does not have a PTY allocated. Start with tty: true".to_string(), + }); + } + + // Check if process is still running + if info.exit_code.is_some() { + return Err(SandboxError::InvalidRequest { + message: "Process has already exited".to_string(), + }); + } + + Ok(ws.on_upgrade(move |socket| handle_terminal_socket(socket, id, process_manager))) +} + +/// Handle the WebSocket connection for terminal I/O +async fn handle_terminal_socket( + socket: WebSocket, + process_id: String, + process_manager: Arc, +) { + let (mut ws_sender, mut ws_receiver) = socket.split(); + + // Get terminal output subscription and input sender + let output_rx = match process_manager.subscribe_terminal_output(&process_id).await { + Ok(rx) => rx, + Err(e) => { + let msg = TerminalMessage::Error { + message: format!("Failed to subscribe to terminal output: {}", e), + }; + let _ = ws_sender + .send(Message::Text(serde_json::to_string(&msg).unwrap())) + .await; + return; + } + }; + + let input_tx = match process_manager.get_terminal_input_sender(&process_id).await { + Ok(tx) => tx, + Err(e) => { + let msg = TerminalMessage::Error { + message: format!("Failed to get terminal input channel: {}", e), + }; + let _ = ws_sender + .send(Message::Text(serde_json::to_string(&msg).unwrap())) + .await; + return; + } + }; + + // Task to forward terminal output to WebSocket + let process_manager_clone = process_manager.clone(); + let process_id_clone = process_id.clone(); + let output_task = tokio::spawn(async move { + forward_output_to_ws(output_rx, ws_sender, process_manager_clone, process_id_clone).await; + }); + + // Handle input from WebSocket + let process_manager_clone = process_manager.clone(); + let process_id_clone = process_id.clone(); + while let Some(msg) = ws_receiver.next().await { + match msg { + Ok(Message::Text(text)) => { + if let Ok(terminal_msg) = serde_json::from_str::(&text) { + match terminal_msg { + TerminalMessage::Input { data } => { + // Send input to terminal + if input_tx.send(data.into_bytes()).is_err() { + break; + } + } + TerminalMessage::Resize { cols, rows } => { + // Resize terminal + if let Err(e) = process_manager_clone + .resize_terminal(&process_id_clone, cols, rows) + .await + { + tracing::warn!("Failed to resize terminal: {}", e); + } + } + _ => { + // Ignore other message types from client + } + } + } + } + Ok(Message::Binary(data)) => { + // Binary data is treated as raw terminal input + if input_tx.send(data).is_err() { + break; + } + } + Ok(Message::Close(_)) => { + break; + } + Err(_) => { + break; + } + _ => {} + } + } + + // Cancel output task + output_task.abort(); +} + +/// Forward terminal output to WebSocket +async fn forward_output_to_ws( + mut output_rx: broadcast::Receiver>, + mut ws_sender: futures::stream::SplitSink, + process_manager: Arc, + process_id: String, +) { + loop { + tokio::select! { + result = output_rx.recv() => { + match result { + Ok(data) => { + // Try to convert to UTF-8, otherwise send as binary + match String::from_utf8(data.clone()) { + Ok(text) => { + let msg = TerminalMessage::Data { data: text }; + if ws_sender + .send(Message::Text(serde_json::to_string(&msg).unwrap())) + .await + .is_err() + { + break; + } + } + Err(_) => { + // Send as binary for non-UTF8 data + if ws_sender.send(Message::Binary(data)).await.is_err() { + break; + } + } + } + } + Err(broadcast::error::RecvError::Closed) => { + // Channel closed, process likely exited + break; + } + Err(broadcast::error::RecvError::Lagged(_)) => { + // Missed some messages, continue + continue; + } + } + } + _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => { + // Check if process is still running + if let Ok(info) = process_manager.get_process(&process_id).await { + if info.exit_code.is_some() { + // Send exit message + let msg = TerminalMessage::Exit { code: info.exit_code }; + let _ = ws_sender + .send(Message::Text(serde_json::to_string(&msg).unwrap())) + .await; + break; + } + } else { + break; + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_terminal_message_serialization() { + let msg = TerminalMessage::Data { + data: "hello".to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("\"type\":\"data\"")); + assert!(json.contains("\"data\":\"hello\"")); + + let msg = TerminalMessage::Resize { cols: 80, rows: 24 }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains("\"type\":\"resize\"")); + assert!(json.contains("\"cols\":80")); + assert!(json.contains("\"rows\":24")); + } +}