feat: improve inspector UI for processes and fix PTY terminal

- Simplify ProcessRunTab layout: compact form with collapsible Advanced section for timeout/maxOutputBytes
- Rewrite ProcessesTab: collapsible create form, lightweight list items with status dots, clean detail panel with tabs
- Extract error details: use problem.detail instead of generic "Stream Error" title for better error messages
- Fix GhosttyTerminal binary frame parsing: handle server's binary ArrayBuffer control frames (ready/exit/error)
- Enable WebSocket proxying in Vite dev server with ws: true
- Set TERM=xterm-256color default for TTY processes so tools like tmux, vim, htop work out of the box
- Remove orange gradient background from terminal container for cleaner look
- Remove orange left border from selected process list items
- Update inspector CSS with new process/terminal styles

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-06 00:14:55 -08:00
parent c3a95c3611
commit 6dbc871db9
31 changed files with 6881 additions and 207 deletions

View file

@ -32,7 +32,7 @@ schemars = "0.8"
utoipa = { version = "4.2", features = ["axum_extras"] }
# Web framework
axum = "0.7"
axum = { version = "0.7", features = ["ws"] }
tower = { version = "0.5", features = ["util"] }
tower-http = { version = "0.5", features = ["cors", "trace"] }

File diff suppressed because it is too large Load diff

View file

@ -1,26 +1,4 @@
# Frontend Instructions
## Inspector Architecture
- Inspector source is `frontend/packages/inspector/`.
- `/ui/` must use ACP over HTTP (`/v2/rpc`) for session/prompt traffic.
- Primary flow:
- `initialize`
- `session/new`
- `session/prompt`
- `session/update` over SSE
- Keep backend/protocol changes in client bindings; avoid unnecessary full UI rewrites.
## Testing
Run inspector checks after transport or chat-flow changes:
```bash
pnpm --filter @sandbox-agent/inspector test
pnpm --filter @sandbox-agent/inspector test:agent-browser
```
## Docs Sync
- Update `docs/inspector.mdx` when `/ui/` behavior changes.
- Update `docs/sdks/typescript.mdx` when inspector SDK bindings or ACP transport behavior changes.
- When the user asks for UI changes, capture screenshots of the updated UI after implementation and verification.
- At the end, offer to open those screenshots for the user and provide absolute filesystem paths to the screenshot files.

View file

@ -2648,6 +2648,402 @@
flex-shrink: 0;
}
/* ── Process form buttons ── */
.process-run-form .button.primary,
.process-create-form .button.primary {
width: auto;
}
.process-detail > .button {
align-self: flex-start;
}
/* ── Run Once tab ── */
.process-run-container {
display: flex;
flex-direction: column;
gap: 16px;
}
.process-run-form {
display: flex;
flex-direction: column;
gap: 10px;
}
.process-run-row {
display: flex;
gap: 10px;
}
.process-run-field {
display: flex;
flex-direction: column;
gap: 4px;
}
.process-run-field-grow {
flex: 1;
min-width: 0;
}
.process-run-field .setup-input {
width: 100%;
}
.process-run-field textarea.setup-input {
resize: vertical;
min-height: 42px;
}
.process-advanced-toggle {
display: inline-flex;
align-items: center;
gap: 4px;
background: none;
border: none;
color: var(--muted);
font-size: 11px;
cursor: pointer;
padding: 2px 0;
align-self: flex-start;
}
.process-advanced-toggle:hover {
color: var(--text-secondary);
}
.process-run-result {
border: 1px solid var(--border);
border-radius: var(--radius);
overflow: hidden;
}
.process-run-result-header {
display: flex;
align-items: center;
gap: 8px;
padding: 10px 12px;
border-bottom: 1px solid var(--border);
background: var(--surface);
}
.process-run-output {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 0;
}
.process-run-output-section {
display: flex;
flex-direction: column;
min-width: 0;
}
.process-run-output-section + .process-run-output-section {
border-left: 1px solid var(--border);
}
.process-run-output-label {
padding: 6px 12px;
font-size: 10px;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.3px;
color: var(--muted);
border-bottom: 1px solid rgba(255, 255, 255, 0.06);
background: var(--surface-2);
}
.process-run-output-section .process-log-block {
border: none;
border-radius: 0;
min-height: 80px;
}
/* ── Processes tab ── */
.processes-container {
display: flex;
flex-direction: column;
gap: 20px;
}
.processes-section {
display: flex;
flex-direction: column;
gap: 8px;
}
.processes-section-toggle {
display: inline-flex;
align-items: center;
gap: 4px;
background: none;
border: none;
color: var(--text);
font-size: 12px;
font-weight: 600;
cursor: pointer;
padding: 2px 0;
align-self: flex-start;
}
.processes-section-toggle:hover {
color: var(--accent);
}
.processes-section-label {
font-size: 12px;
font-weight: 600;
color: var(--text);
}
.processes-list-header {
display: flex;
align-items: center;
justify-content: space-between;
}
.process-create-form {
display: flex;
flex-direction: column;
gap: 10px;
padding: 12px;
background: var(--surface);
border: 1px solid var(--border);
border-radius: var(--radius);
}
.process-checkbox-row {
display: flex;
flex-wrap: wrap;
gap: 14px;
}
.process-checkbox {
display: inline-flex;
align-items: center;
gap: 6px;
font-size: 11px;
color: var(--text-secondary);
cursor: pointer;
}
.process-checkbox input {
margin: 0;
}
/* Process list items */
.process-list {
display: flex;
flex-direction: column;
gap: 2px;
}
.process-list-item {
display: flex;
flex-direction: column;
gap: 4px;
padding: 8px 10px;
border-radius: var(--radius-sm);
cursor: pointer;
transition: background var(--transition);
}
.process-list-item:hover {
background: var(--surface);
}
.process-list-item.selected {
background: var(--surface);
}
.process-list-item-main {
display: flex;
align-items: center;
gap: 8px;
min-width: 0;
}
.process-status-dot {
width: 6px;
height: 6px;
border-radius: 50%;
flex-shrink: 0;
background: var(--muted);
}
.process-status-dot.running {
background: var(--success);
}
.process-status-dot.exited {
background: var(--muted);
}
.process-list-item-cmd {
font-size: 12px;
color: var(--text);
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
min-width: 0;
}
.process-list-item-meta {
display: flex;
align-items: center;
gap: 8px;
font-size: 10px;
color: var(--muted);
padding-left: 14px;
}
.process-list-item-id {
font-family: ui-monospace, SFMono-Regular, 'SF Mono', Consolas, monospace;
opacity: 0.7;
}
.process-list-item-actions {
display: flex;
gap: 4px;
padding-left: 14px;
margin-top: 2px;
}
.process-list-item-actions .button {
padding: 4px 8px;
font-size: 11px;
}
/* Process detail panel */
.process-detail {
padding: 12px;
background: var(--surface);
border: 1px solid var(--border);
border-radius: var(--radius);
display: flex;
flex-direction: column;
gap: 10px;
}
.process-detail-header {
display: flex;
align-items: center;
justify-content: space-between;
gap: 8px;
}
.process-detail-cmd {
font-size: 12px;
color: var(--text);
word-break: break-word;
}
.process-detail-meta {
display: flex;
flex-wrap: wrap;
gap: 6px 14px;
font-size: 11px;
color: var(--muted);
}
.process-detail-logs {
display: flex;
flex-direction: column;
gap: 6px;
}
.process-detail-logs-header {
display: flex;
align-items: center;
justify-content: space-between;
}
.process-detail-logs-header .button {
padding: 4px 8px;
font-size: 11px;
}
/* Terminal (shared) */
.process-terminal-shell {
margin-top: 4px;
border: 1px solid rgba(255, 255, 255, 0.1);
border-radius: 10px;
overflow: hidden;
background: rgba(0, 0, 0, 0.3);
}
.process-terminal-meta {
display: flex;
align-items: center;
justify-content: space-between;
gap: 12px;
padding: 8px 12px;
border-bottom: 1px solid rgba(255, 255, 255, 0.08);
background: rgba(0, 0, 0, 0.2);
font-size: 11px;
color: var(--text-secondary);
}
.process-terminal-status {
display: inline-flex;
align-items: center;
gap: 6px;
color: var(--muted);
}
.process-terminal-status.ready {
color: var(--success);
}
.process-terminal-status.error {
color: var(--danger);
}
.process-terminal-status.closed {
color: var(--warning);
}
.process-terminal-host {
min-height: 320px;
max-height: 480px;
overflow: hidden;
padding: 10px;
}
.process-terminal-host > div {
width: 100%;
height: 100%;
}
.process-terminal-empty {
margin-top: 4px;
padding: 10px 12px;
border: 1px dashed rgba(255, 255, 255, 0.1);
border-radius: var(--radius-sm);
color: var(--muted);
font-size: 11px;
}
/* Log block (shared) */
.process-log-block {
margin: 0;
min-height: 80px;
max-height: 280px;
overflow: auto;
padding: 10px 12px;
border-radius: var(--radius);
border: 1px solid rgba(255, 255, 255, 0.08);
background: rgba(9, 9, 11, 0.95);
color: #e4e4e7;
font-family: ui-monospace, SFMono-Regular, 'SF Mono', Consolas, monospace;
font-size: 11px;
line-height: 1.55;
white-space: pre-wrap;
word-break: break-word;
}
.pill {
display: inline-flex;
align-items: center;
@ -3026,6 +3422,26 @@
flex-shrink: 0;
}
@media (max-width: 900px) {
.process-run-row {
flex-direction: column;
}
.process-run-output {
grid-template-columns: 1fr;
}
.process-run-output-section + .process-run-output-section {
border-left: none;
border-top: 1px solid var(--border);
}
.process-terminal-meta {
flex-direction: column;
align-items: flex-start;
}
}
/* Scrollbar - match landing page */
* {
scrollbar-width: thin;

View file

@ -23,6 +23,7 @@
},
"dependencies": {
"@sandbox-agent/persist-indexeddb": "workspace:*",
"ghostty-web": "^0.4.0",
"lucide-react": "^0.469.0",
"react": "^18.3.1",
"react-dom": "^18.3.1"

View file

@ -1,15 +1,17 @@
import { ChevronLeft, ChevronRight, Cloud, PlayCircle, Server, Terminal, Wrench } from "lucide-react";
import { ChevronLeft, ChevronRight, Cloud, Play, PlayCircle, Server, Terminal, Wrench } from "lucide-react";
import type { AgentInfo, SandboxAgent, SessionEvent } from "sandbox-agent";
type AgentModeInfo = { id: string; name: string; description: string };
import AgentsTab from "./AgentsTab";
import EventsTab from "./EventsTab";
import McpTab from "./McpTab";
import ProcessesTab from "./ProcessesTab";
import ProcessRunTab from "./ProcessRunTab";
import SkillsTab from "./SkillsTab";
import RequestLogTab from "./RequestLogTab";
import type { RequestLog } from "../../types/requestLog";
export type DebugTab = "log" | "events" | "agents" | "mcp" | "skills";
export type DebugTab = "log" | "events" | "agents" | "mcp" | "skills" | "processes" | "run-process";
const DebugPanel = ({
debugTab,
@ -81,6 +83,14 @@ const DebugPanel = ({
<Server className="button-icon" style={{ marginRight: 4, width: 12, height: 12 }} />
MCP
</button>
<button className={`debug-tab ${debugTab === "processes" ? "active" : ""}`} onClick={() => onDebugTabChange("processes")}>
<Terminal className="button-icon" style={{ marginRight: 4, width: 12, height: 12 }} />
Processes
</button>
<button className={`debug-tab ${debugTab === "run-process" ? "active" : ""}`} onClick={() => onDebugTabChange("run-process")}>
<Play className="button-icon" style={{ marginRight: 4, width: 12, height: 12 }} />
Run Once
</button>
<button className={`debug-tab ${debugTab === "skills" ? "active" : ""}`} onClick={() => onDebugTabChange("skills")}>
<Wrench className="button-icon" style={{ marginRight: 4, width: 12, height: 12 }} />
Skills
@ -122,6 +132,14 @@ const DebugPanel = ({
<McpTab getClient={getClient} />
)}
{debugTab === "processes" && (
<ProcessesTab getClient={getClient} />
)}
{debugTab === "run-process" && (
<ProcessRunTab getClient={getClient} />
)}
{debugTab === "skills" && (
<SkillsTab getClient={getClient} />
)}

View file

@ -0,0 +1,165 @@
import { ChevronDown, ChevronRight, Loader2, Play } from "lucide-react";
import { useState } from "react";
import { SandboxAgentError } from "sandbox-agent";
import type { ProcessRunResponse, SandboxAgent } from "sandbox-agent";
const parseArgs = (value: string): string[] => value.split("\n").map((part) => part.trim()).filter(Boolean);
const ProcessRunTab = ({
getClient,
}: {
getClient: () => SandboxAgent;
}) => {
const [command, setCommand] = useState("");
const [argsText, setArgsText] = useState("");
const [cwd, setCwd] = useState("");
const [timeoutMs, setTimeoutMs] = useState("30000");
const [maxOutputBytes, setMaxOutputBytes] = useState("");
const [showAdvanced, setShowAdvanced] = useState(false);
const [running, setRunning] = useState(false);
const [error, setError] = useState<string | null>(null);
const [result, setResult] = useState<ProcessRunResponse | null>(null);
const handleRun = async () => {
const trimmedCommand = command.trim();
if (!trimmedCommand) {
setError("Command is required.");
return;
}
setRunning(true);
setError(null);
try {
const response = await getClient().runProcess({
command: trimmedCommand,
args: parseArgs(argsText),
cwd: cwd.trim() || undefined,
timeoutMs: timeoutMs.trim() ? Number(timeoutMs) : undefined,
maxOutputBytes: maxOutputBytes.trim() ? Number(maxOutputBytes) : undefined,
});
setResult(response);
} catch (runError) {
const detail = runError instanceof SandboxAgentError ? runError.problem?.detail : undefined;
setError(detail || (runError instanceof Error ? runError.message : "Unable to run process."));
setResult(null);
} finally {
setRunning(false);
}
};
return (
<div className="process-run-container">
<div className="process-run-form">
<div className="process-run-row">
<div className="process-run-field process-run-field-grow">
<label className="label">Command</label>
<input
className="setup-input mono"
value={command}
onChange={(event) => {
setCommand(event.target.value);
setError(null);
}}
placeholder="bash"
/>
</div>
<div className="process-run-field process-run-field-grow">
<label className="label">Working Directory</label>
<input
className="setup-input mono"
value={cwd}
onChange={(event) => {
setCwd(event.target.value);
setError(null);
}}
placeholder="/workspace"
/>
</div>
</div>
<div className="process-run-field">
<label className="label">Arguments</label>
<textarea
className="setup-input mono"
rows={2}
value={argsText}
onChange={(event) => {
setArgsText(event.target.value);
setError(null);
}}
placeholder={"One argument per line, e.g.\n-lc"}
/>
</div>
<button
className="process-advanced-toggle"
onClick={() => setShowAdvanced((prev) => !prev)}
type="button"
>
{showAdvanced ? <ChevronDown size={12} /> : <ChevronRight size={12} />}
Advanced
</button>
{showAdvanced && (
<div className="process-run-row">
<div className="process-run-field process-run-field-grow">
<label className="label">Timeout (ms)</label>
<input
className="setup-input mono"
value={timeoutMs}
onChange={(event) => {
setTimeoutMs(event.target.value);
setError(null);
}}
placeholder="30000"
/>
</div>
<div className="process-run-field process-run-field-grow">
<label className="label">Max Output Bytes</label>
<input
className="setup-input mono"
value={maxOutputBytes}
onChange={(event) => {
setMaxOutputBytes(event.target.value);
setError(null);
}}
placeholder="Default"
/>
</div>
</div>
)}
{error ? <div className="banner error">{error}</div> : null}
<button className="button primary small" onClick={() => void handleRun()} disabled={running} style={{ alignSelf: "flex-start" }}>
{running ? <Loader2 className="button-icon spinner-icon" /> : <Play className="button-icon" />}
{running ? "Running..." : "Run"}
</button>
</div>
{result ? (
<div className="process-run-result">
<div className="process-run-result-header">
<span className={`pill ${result.timedOut ? "warning" : result.exitCode === 0 ? "success" : "danger"}`}>
{result.timedOut ? "Timed Out" : `exit ${result.exitCode ?? "?"}`}
</span>
<span className="card-meta">{result.durationMs}ms</span>
</div>
<div className="process-run-output">
<div className="process-run-output-section">
<div className="process-run-output-label">stdout{result.stdoutTruncated ? " (truncated)" : ""}</div>
<pre className="process-log-block">{result.stdout || "(empty)"}</pre>
</div>
<div className="process-run-output-section">
<div className="process-run-output-label">stderr{result.stderrTruncated ? " (truncated)" : ""}</div>
<pre className="process-log-block">{result.stderr || "(empty)"}</pre>
</div>
</div>
</div>
) : null}
</div>
);
};
export default ProcessRunTab;

View file

@ -0,0 +1,430 @@
import { ChevronDown, ChevronRight, Loader2, Play, RefreshCw, Skull, SquareTerminal, Trash2 } from "lucide-react";
import { useCallback, useEffect, useMemo, useState } from "react";
import { SandboxAgentError } from "sandbox-agent";
import type { ProcessInfo, SandboxAgent } from "sandbox-agent";
import GhosttyTerminal from "../processes/GhosttyTerminal";
const extractErrorMessage = (error: unknown, fallback: string): string => {
if (error instanceof SandboxAgentError && error.problem?.detail) return error.problem.detail;
if (error instanceof Error) return error.message;
return fallback;
};
const decodeBase64Utf8 = (value: string): string => {
try {
const bytes = Uint8Array.from(window.atob(value), (char) => char.charCodeAt(0));
return new TextDecoder().decode(bytes);
} catch {
return value;
}
};
const formatDateTime = (value: number | null | undefined): string => {
if (!value) {
return "Unknown";
}
return new Date(value).toLocaleString();
};
const parseArgs = (value: string): string[] => value.split("\n").map((part) => part.trim()).filter(Boolean);
const formatCommandSummary = (process: Pick<ProcessInfo, "command" | "args">): string => {
return [process.command, ...process.args].join(" ").trim();
};
const canOpenTerminal = (process: ProcessInfo | null | undefined): boolean => {
return Boolean(process && process.status === "running" && process.interactive && process.tty);
};
const ProcessesTab = ({
getClient,
}: {
getClient: () => SandboxAgent;
}) => {
const [processes, setProcesses] = useState<ProcessInfo[]>([]);
const [loading, setLoading] = useState(false);
const [refreshing, setRefreshing] = useState(false);
const [error, setError] = useState<string | null>(null);
const [command, setCommand] = useState("");
const [argsText, setArgsText] = useState("");
const [cwd, setCwd] = useState("");
const [interactive, setInteractive] = useState(true);
const [tty, setTty] = useState(true);
const [creating, setCreating] = useState(false);
const [createError, setCreateError] = useState<string | null>(null);
const [showCreateForm, setShowCreateForm] = useState(true);
const [selectedProcessId, setSelectedProcessId] = useState<string | null>(null);
const [logsText, setLogsText] = useState("");
const [logsLoading, setLogsLoading] = useState(false);
const [logsError, setLogsError] = useState<string | null>(null);
const [terminalOpen, setTerminalOpen] = useState(false);
const [actingProcessId, setActingProcessId] = useState<string | null>(null);
const loadProcesses = useCallback(async (mode: "initial" | "refresh" = "initial") => {
if (mode === "initial") {
setLoading(true);
} else {
setRefreshing(true);
}
setError(null);
try {
const response = await getClient().listProcesses();
setProcesses(response.processes);
setSelectedProcessId((current) => {
if (!current) {
return response.processes[0]?.id ?? null;
}
return response.processes.some((listedProcess) => listedProcess.id === current)
? current
: response.processes[0]?.id ?? null;
});
} catch (loadError) {
setError(extractErrorMessage(loadError, "Unable to load processes."));
} finally {
setLoading(false);
setRefreshing(false);
}
}, [getClient]);
const loadSelectedLogs = useCallback(async (process: ProcessInfo | null) => {
if (!process) {
setLogsText("");
setLogsError(null);
return;
}
setLogsLoading(true);
setLogsError(null);
try {
const response = await getClient().getProcessLogs(process.id, {
stream: process.tty ? "pty" : "combined",
tail: 200,
});
const text = response.entries.map((logEntry) => decodeBase64Utf8(logEntry.data)).join("");
setLogsText(text);
} catch (loadError) {
setLogsError(extractErrorMessage(loadError, "Unable to load process logs."));
setLogsText("");
} finally {
setLogsLoading(false);
}
}, [getClient]);
useEffect(() => {
void loadProcesses();
}, [loadProcesses]);
const selectedProcess = useMemo(
() => processes.find((process) => process.id === selectedProcessId) ?? null,
[processes, selectedProcessId]
);
useEffect(() => {
void loadSelectedLogs(selectedProcess);
if (!canOpenTerminal(selectedProcess)) {
setTerminalOpen(false);
}
}, [loadSelectedLogs, selectedProcess]);
const handleCreateProcess = async () => {
const trimmedCommand = command.trim();
if (!trimmedCommand) {
setCreateError("Command is required.");
return;
}
setCreating(true);
setCreateError(null);
try {
const created = await getClient().createProcess({
command: trimmedCommand,
args: parseArgs(argsText),
cwd: cwd.trim() || undefined,
interactive,
tty,
});
await loadProcesses("refresh");
setSelectedProcessId(created.id);
setTerminalOpen(created.interactive && created.tty);
setCommand("");
setArgsText("");
setCwd("");
setInteractive(true);
setTty(true);
} catch (createFailure) {
setCreateError(extractErrorMessage(createFailure, "Unable to create process."));
} finally {
setCreating(false);
}
};
const handleAction = async (processId: string, action: "stop" | "kill" | "delete") => {
setActingProcessId(`${action}:${processId}`);
setError(null);
try {
const client = getClient();
if (action === "stop") {
await client.stopProcess(processId, { waitMs: 2_000 });
} else if (action === "kill") {
await client.killProcess(processId, { waitMs: 2_000 });
} else {
await client.deleteProcess(processId);
}
await loadProcesses("refresh");
} catch (actionError) {
setError(extractErrorMessage(actionError, `Unable to ${action} process.`));
} finally {
setActingProcessId(null);
}
};
return (
<div className="processes-container">
{/* Create form */}
<div className="processes-section">
<button
className="processes-section-toggle"
onClick={() => setShowCreateForm((prev) => !prev)}
type="button"
>
{showCreateForm ? <ChevronDown size={12} /> : <ChevronRight size={12} />}
<span>Create Process</span>
</button>
{showCreateForm && (
<div className="process-create-form">
<div className="process-run-row">
<div className="process-run-field process-run-field-grow">
<label className="label">Command</label>
<input
className="setup-input mono"
value={command}
onChange={(event) => {
setCommand(event.target.value);
setCreateError(null);
}}
placeholder="bash"
/>
</div>
<div className="process-run-field process-run-field-grow">
<label className="label">Working Directory</label>
<input
className="setup-input mono"
value={cwd}
onChange={(event) => {
setCwd(event.target.value);
setCreateError(null);
}}
placeholder="/workspace"
/>
</div>
</div>
<div className="process-run-field">
<label className="label">Arguments</label>
<textarea
className="setup-input mono"
rows={2}
value={argsText}
onChange={(event) => {
setArgsText(event.target.value);
setCreateError(null);
}}
placeholder={"One argument per line"}
/>
</div>
<div className="process-checkbox-row">
<label className="process-checkbox">
<input
type="checkbox"
checked={interactive}
onChange={(event) => {
setInteractive(event.target.checked);
if (!event.target.checked) {
setTty(false);
}
}}
/>
<span>interactive</span>
</label>
<label className="process-checkbox">
<input
type="checkbox"
checked={tty}
onChange={(event) => {
setTty(event.target.checked);
if (event.target.checked) {
setInteractive(true);
}
}}
/>
<span>tty</span>
</label>
</div>
{createError ? <div className="banner error">{createError}</div> : null}
<button className="button primary small" onClick={() => void handleCreateProcess()} disabled={creating} style={{ alignSelf: "flex-start" }}>
{creating ? <Loader2 className="button-icon spinner-icon" /> : <Play className="button-icon" />}
{creating ? "Creating..." : "Create"}
</button>
</div>
)}
</div>
{/* Process list */}
<div className="processes-section">
<div className="processes-list-header">
<span className="processes-section-label">Processes</span>
<button className="button secondary small" onClick={() => void loadProcesses("refresh")} disabled={loading || refreshing}>
<RefreshCw className={`button-icon ${loading || refreshing ? "spinner-icon" : ""}`} size={12} />
Refresh
</button>
</div>
{error ? <div className="banner error">{error}</div> : null}
{loading ? <div className="card-meta">Loading...</div> : null}
{!loading && processes.length === 0 ? <div className="card-meta">No processes yet.</div> : null}
<div className="process-list">
{processes.map((process) => {
const isSelected = selectedProcessId === process.id;
const isStopping = actingProcessId === `stop:${process.id}`;
const isKilling = actingProcessId === `kill:${process.id}`;
const isDeleting = actingProcessId === `delete:${process.id}`;
return (
<div
key={process.id}
className={`process-list-item ${isSelected ? "selected" : ""}`}
onClick={() => {
setSelectedProcessId(process.id);
setTerminalOpen(false);
}}
>
<div className="process-list-item-main">
<span className={`process-status-dot ${process.status}`} />
<span className="process-list-item-cmd mono">{formatCommandSummary(process)}</span>
{process.interactive && process.tty && (
<span className="pill neutral" style={{ fontSize: 9 }}>tty</span>
)}
</div>
<div className="process-list-item-meta">
<span>PID {process.pid ?? "?"}</span>
<span className="process-list-item-id">{process.id.slice(0, 8)}</span>
</div>
<div className="process-list-item-actions">
{canOpenTerminal(process) ? (
<button
className="button secondary small"
onClick={(e) => {
e.stopPropagation();
setSelectedProcessId(process.id);
setTerminalOpen(true);
}}
>
<SquareTerminal className="button-icon" size={12} />
Terminal
</button>
) : null}
{process.status === "running" ? (
<>
<button
className="button secondary small"
onClick={(e) => { e.stopPropagation(); void handleAction(process.id, "stop"); }}
disabled={Boolean(actingProcessId)}
>
{isStopping ? <Loader2 className="button-icon spinner-icon" size={12} /> : null}
Stop
</button>
<button
className="button secondary small"
onClick={(e) => { e.stopPropagation(); void handleAction(process.id, "kill"); }}
disabled={Boolean(actingProcessId)}
>
{isKilling ? <Loader2 className="button-icon spinner-icon" size={12} /> : <Skull className="button-icon" size={12} />}
Kill
</button>
</>
) : null}
{process.status === "exited" ? (
<button
className="button secondary small"
onClick={(e) => { e.stopPropagation(); void handleAction(process.id, "delete"); }}
disabled={Boolean(actingProcessId)}
>
{isDeleting ? <Loader2 className="button-icon spinner-icon" size={12} /> : <Trash2 className="button-icon" size={12} />}
Delete
</button>
) : null}
</div>
</div>
);
})}
</div>
</div>
{/* Selected process detail */}
{selectedProcess ? (
<div className="processes-section">
<div className="processes-section-label">Detail</div>
<div className="process-detail">
<div className="process-detail-header">
<span className="process-detail-cmd mono">{formatCommandSummary(selectedProcess)}</span>
<span className={`pill ${selectedProcess.status === "running" ? "success" : "neutral"}`}>{selectedProcess.status}</span>
</div>
<div className="process-detail-meta">
<span>PID: {selectedProcess.pid ?? "?"}</span>
<span>Created: {formatDateTime(selectedProcess.createdAtMs)}</span>
{selectedProcess.exitedAtMs ? <span>Exited: {formatDateTime(selectedProcess.exitedAtMs)}</span> : null}
{selectedProcess.exitCode != null ? <span>Exit code: {selectedProcess.exitCode}</span> : null}
<span className="mono" style={{ opacity: 0.6 }}>{selectedProcess.id}</span>
</div>
{/* Terminal */}
{terminalOpen && canOpenTerminal(selectedProcess) ? (
<GhosttyTerminal
client={getClient()}
processId={selectedProcess.id}
onExit={() => {
void loadProcesses("refresh");
}}
/>
) : canOpenTerminal(selectedProcess) ? (
<button
className="button secondary small"
onClick={() => setTerminalOpen(true)}
style={{ marginTop: 8 }}
>
<SquareTerminal className="button-icon" size={12} />
Open Terminal
</button>
) : selectedProcess.interactive && selectedProcess.tty ? (
<div className="process-terminal-empty">
Terminal available while process is running.
</div>
) : null}
{/* Logs */}
<div className="process-detail-logs">
<div className="process-detail-logs-header">
<span className="label">Logs</span>
<button className="button secondary small" onClick={() => void loadSelectedLogs(selectedProcess)} disabled={logsLoading}>
{logsLoading ? <Loader2 className="button-icon spinner-icon" size={12} /> : <RefreshCw className="button-icon" size={12} />}
Refresh
</button>
</div>
{logsError ? <div className="banner error">{logsError}</div> : null}
<pre className="process-log-block">{logsText || (logsLoading ? "Loading..." : "(no output)")}</pre>
</div>
</div>
</div>
) : null}
</div>
);
};
export default ProcessesTab;

View file

@ -0,0 +1,263 @@
import { AlertCircle, Loader2, PlugZap, SquareTerminal } from "lucide-react";
import { FitAddon, Terminal, init } from "ghostty-web";
import { useEffect, useRef, useState } from "react";
import type { ProcessTerminalServerFrame, SandboxAgent } from "sandbox-agent";
type ConnectionState = "connecting" | "ready" | "closed" | "error";
const terminalTheme = {
background: "#09090b",
foreground: "#f4f4f5",
cursor: "#f97316",
cursorAccent: "#09090b",
selectionBackground: "#27272a",
black: "#18181b",
red: "#f87171",
green: "#4ade80",
yellow: "#fbbf24",
blue: "#60a5fa",
magenta: "#f472b6",
cyan: "#22d3ee",
white: "#e4e4e7",
brightBlack: "#3f3f46",
brightRed: "#fb7185",
brightGreen: "#86efac",
brightYellow: "#fde047",
brightBlue: "#93c5fd",
brightMagenta: "#f9a8d4",
brightCyan: "#67e8f9",
brightWhite: "#fafafa",
};
const toUint8Array = async (data: Blob | ArrayBuffer): Promise<Uint8Array> => {
if (data instanceof ArrayBuffer) {
return new Uint8Array(data);
}
return new Uint8Array(await data.arrayBuffer());
};
const isServerFrame = (value: unknown): value is ProcessTerminalServerFrame => {
if (!value || typeof value !== "object") {
return false;
}
const type = (value as { type?: unknown }).type;
return type === "ready" || type === "exit" || type === "error";
};
const GhosttyTerminal = ({
client,
processId,
onExit,
}: {
client: SandboxAgent;
processId: string;
onExit?: () => void;
}) => {
const hostRef = useRef<HTMLDivElement | null>(null);
const [connectionState, setConnectionState] = useState<ConnectionState>("connecting");
const [statusMessage, setStatusMessage] = useState("Connecting to PTY...");
const [exitCode, setExitCode] = useState<number | null>(null);
useEffect(() => {
let cancelled = false;
let terminal: Terminal | null = null;
let fitAddon: FitAddon | null = null;
let socket: WebSocket | null = null;
let resizeRaf = 0;
let removeDataListener: { dispose(): void } | null = null;
let removeResizeListener: { dispose(): void } | null = null;
const sendFrame = (payload: unknown) => {
if (!socket || socket.readyState !== WebSocket.OPEN) {
return;
}
socket.send(JSON.stringify(payload));
};
const syncSize = () => {
if (!terminal) {
return;
}
sendFrame({
type: "resize",
cols: terminal.cols,
rows: terminal.rows,
});
};
const connect = async () => {
try {
await init();
if (cancelled || !hostRef.current) {
return;
}
terminal = new Terminal({
allowTransparency: true,
cursorBlink: true,
cursorStyle: "block",
fontFamily: "ui-monospace, SFMono-Regular, SF Mono, Menlo, monospace",
fontSize: 13,
smoothScrollDuration: 90,
theme: terminalTheme,
});
fitAddon = new FitAddon();
terminal.open(hostRef.current);
terminal.loadAddon(fitAddon);
fitAddon.fit();
fitAddon.observeResize();
terminal.focus();
removeDataListener = terminal.onData((data) => {
sendFrame({ type: "input", data });
});
removeResizeListener = terminal.onResize(() => {
if (resizeRaf) {
window.cancelAnimationFrame(resizeRaf);
}
resizeRaf = window.requestAnimationFrame(syncSize);
});
const nextSocket = client.connectProcessTerminalWebSocket(processId);
socket = nextSocket;
nextSocket.binaryType = "arraybuffer";
const tryParseControlFrame = (raw: string | ArrayBuffer | Blob): ProcessTerminalServerFrame | null => {
let text: string | undefined;
if (typeof raw === "string") {
text = raw;
} else if (raw instanceof ArrayBuffer) {
// Server may send JSON control frames as binary; try to decode small messages as JSON.
if (raw.byteLength < 256) {
try {
text = new TextDecoder().decode(raw);
} catch {
// not decodable, treat as terminal data
}
}
}
if (!text) return null;
try {
const parsed = JSON.parse(text);
return isServerFrame(parsed) ? parsed : null;
} catch {
return null;
}
};
const handleControlFrame = (frame: ProcessTerminalServerFrame): void => {
if (frame.type === "ready") {
setConnectionState("ready");
setStatusMessage("Connected");
syncSize();
return;
}
if (frame.type === "exit") {
setConnectionState("closed");
setExitCode(frame.exitCode ?? null);
setStatusMessage(
frame.exitCode == null ? "Process exited." : `Process exited with code ${frame.exitCode}.`
);
onExit?.();
return;
}
if (frame.type === "error") {
setConnectionState("error");
setStatusMessage(frame.message);
}
};
nextSocket.addEventListener("message", (event) => {
if (cancelled || !terminal) {
return;
}
const controlFrame = tryParseControlFrame(event.data);
if (controlFrame) {
handleControlFrame(controlFrame);
return;
}
void toUint8Array(event.data).then((bytes) => {
if (!cancelled && terminal) {
terminal.write(bytes);
}
});
});
nextSocket.addEventListener("close", () => {
if (cancelled) {
return;
}
setConnectionState((current) => (current === "error" ? current : "closed"));
setStatusMessage((current) => (current === "Connected" ? "Terminal disconnected." : current));
});
nextSocket.addEventListener("error", () => {
if (cancelled) {
return;
}
setConnectionState("error");
setStatusMessage("WebSocket connection failed.");
});
} catch (error) {
if (cancelled) {
return;
}
setConnectionState("error");
setStatusMessage(error instanceof Error ? error.message : "Failed to initialize Ghostty terminal.");
}
};
void connect();
return () => {
cancelled = true;
if (resizeRaf) {
window.cancelAnimationFrame(resizeRaf);
}
removeDataListener?.dispose();
removeResizeListener?.dispose();
if (socket?.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({ type: "close" }));
socket.close();
} else if (socket?.readyState === WebSocket.CONNECTING) {
const pendingSocket = socket;
pendingSocket.addEventListener("open", () => {
pendingSocket.close();
}, { once: true });
}
terminal?.dispose();
};
}, [client, onExit, processId]);
return (
<div className="process-terminal-shell">
<div className="process-terminal-meta">
<div className="inline-row">
<SquareTerminal size={13} />
<span>Ghostty PTY</span>
</div>
<div className={`process-terminal-status ${connectionState}`}>
{connectionState === "connecting" ? <Loader2 size={12} className="spinner-icon" /> : null}
{connectionState === "ready" ? <PlugZap size={12} /> : null}
{connectionState === "error" ? <AlertCircle size={12} /> : null}
<span>{statusMessage}</span>
{exitCode != null ? <span className="mono">exit={exitCode}</span> : null}
</div>
</div>
<div
ref={hostRef}
className="process-terminal-host"
role="presentation"
onClick={() => {
hostRef.current?.querySelector("textarea")?.focus();
}}
/>
</div>
);
};
export default GhosttyTerminal;

View file

@ -10,6 +10,7 @@ export default defineConfig(({ command }) => ({
"/v1": {
target: "http://localhost:2468",
changeOrigin: true,
ws: true,
},
},
},

388
pnpm-lock.yaml generated

File diff suppressed because it is too large Load diff

View file

@ -74,6 +74,10 @@ describe("AcpHttpClient integration", () => {
timeoutMs: 30000,
env: {
XDG_DATA_HOME: dataHome,
HOME: dataHome,
USERPROFILE: dataHome,
APPDATA: join(dataHome, "AppData", "Roaming"),
LOCALAPPDATA: join(dataHome, "AppData", "Local"),
},
});
baseUrl = handle.baseUrl;

View file

@ -60,6 +60,10 @@ describe("IndexedDB persistence end-to-end", () => {
timeoutMs: 30000,
env: {
XDG_DATA_HOME: dataHome,
HOME: dataHome,
USERPROFILE: dataHome,
APPDATA: join(dataHome, "AppData", "Roaming"),
LOCALAPPDATA: join(dataHome, "AppData", "Local"),
},
});
baseUrl = handle.baseUrl;

View file

@ -64,6 +64,10 @@ describe("Postgres persistence driver", () => {
timeoutMs: 30000,
env: {
XDG_DATA_HOME: dataHome,
HOME: dataHome,
USERPROFILE: dataHome,
APPDATA: join(dataHome, "AppData", "Roaming"),
LOCALAPPDATA: join(dataHome, "AppData", "Local"),
},
});
baseUrl = handle.baseUrl;

View file

@ -55,6 +55,10 @@ describe("SQLite persistence driver", () => {
timeoutMs: 30000,
env: {
XDG_DATA_HOME: dataHome,
HOME: dataHome,
USERPROFILE: dataHome,
APPDATA: join(dataHome, "AppData", "Roaming"),
LOCALAPPDATA: join(dataHome, "AppData", "Local"),
},
});
baseUrl = handle.baseUrl;

View file

@ -17,8 +17,8 @@
}
},
"dependencies": {
"acp-http-client": "workspace:*",
"@sandbox-agent/cli-shared": "workspace:*"
"@sandbox-agent/cli-shared": "workspace:*",
"acp-http-client": "workspace:*"
},
"files": [
"dist"
@ -34,10 +34,12 @@
},
"devDependencies": {
"@types/node": "^22.0.0",
"@types/ws": "^8.18.1",
"openapi-typescript": "^6.7.0",
"tsup": "^8.0.0",
"typescript": "^5.7.0",
"vitest": "^3.0.0"
"vitest": "^3.0.0",
"ws": "^8.19.0"
},
"optionalDependencies": {
"@sandbox-agent/cli": "workspace:*"

View file

@ -39,6 +39,20 @@ import {
type McpConfigQuery,
type McpServerConfig,
type ProblemDetails,
type ProcessConfig,
type ProcessCreateRequest,
type ProcessInfo,
type ProcessInputRequest,
type ProcessInputResponse,
type ProcessListResponse,
type ProcessLogEntry,
type ProcessLogsQuery,
type ProcessLogsResponse,
type ProcessRunRequest,
type ProcessRunResponse,
type ProcessSignalQuery,
type ProcessTerminalResizeRequest,
type ProcessTerminalResizeResponse,
type SessionEvent,
type SessionPersistDriver,
type SessionRecord,
@ -98,6 +112,27 @@ export interface SessionSendOptions {
}
export type SessionEventListener = (event: SessionEvent) => void;
export type ProcessLogListener = (entry: ProcessLogEntry) => void;
export type ProcessLogFollowQuery = Omit<ProcessLogsQuery, "follow">;
export interface AgentQueryOptions {
config?: boolean;
noCache?: boolean;
}
export interface ProcessLogSubscription {
close(): void;
closed: Promise<void>;
}
export interface ProcessTerminalWebSocketUrlOptions {
accessToken?: string;
}
export interface ProcessTerminalConnectOptions extends ProcessTerminalWebSocketUrlOptions {
protocols?: string | string[];
WebSocket?: typeof WebSocket;
}
export class SandboxAgentError extends Error {
readonly status: number;
@ -674,15 +709,15 @@ export class SandboxAgent {
return this.requestJson("GET", `${API_PREFIX}/health`);
}
async listAgents(options?: { config?: boolean }): Promise<AgentListResponse> {
async listAgents(options?: AgentQueryOptions): Promise<AgentListResponse> {
return this.requestJson("GET", `${API_PREFIX}/agents`, {
query: options?.config ? { config: "true" } : undefined,
query: toAgentQuery(options),
});
}
async getAgent(agent: string, options?: { config?: boolean }): Promise<AgentInfo> {
async getAgent(agent: string, options?: AgentQueryOptions): Promise<AgentInfo> {
return this.requestJson("GET", `${API_PREFIX}/agents/${encodeURIComponent(agent)}`, {
query: options?.config ? { config: "true" } : undefined,
query: toAgentQuery(options),
});
}
@ -771,6 +806,134 @@ export class SandboxAgent {
await this.requestRaw("DELETE", `${API_PREFIX}/config/skills`, { query });
}
async getProcessConfig(): Promise<ProcessConfig> {
return this.requestJson("GET", `${API_PREFIX}/processes/config`);
}
async setProcessConfig(config: ProcessConfig): Promise<ProcessConfig> {
return this.requestJson("POST", `${API_PREFIX}/processes/config`, {
body: config,
});
}
async createProcess(request: ProcessCreateRequest): Promise<ProcessInfo> {
return this.requestJson("POST", `${API_PREFIX}/processes`, {
body: request,
});
}
async runProcess(request: ProcessRunRequest): Promise<ProcessRunResponse> {
return this.requestJson("POST", `${API_PREFIX}/processes/run`, {
body: request,
});
}
async listProcesses(): Promise<ProcessListResponse> {
return this.requestJson("GET", `${API_PREFIX}/processes`);
}
async getProcess(id: string): Promise<ProcessInfo> {
return this.requestJson("GET", `${API_PREFIX}/processes/${encodeURIComponent(id)}`);
}
async stopProcess(id: string, query?: ProcessSignalQuery): Promise<ProcessInfo> {
return this.requestJson("POST", `${API_PREFIX}/processes/${encodeURIComponent(id)}/stop`, {
query,
});
}
async killProcess(id: string, query?: ProcessSignalQuery): Promise<ProcessInfo> {
return this.requestJson("POST", `${API_PREFIX}/processes/${encodeURIComponent(id)}/kill`, {
query,
});
}
async deleteProcess(id: string): Promise<void> {
await this.requestRaw("DELETE", `${API_PREFIX}/processes/${encodeURIComponent(id)}`);
}
async getProcessLogs(id: string, query: ProcessLogFollowQuery = {}): Promise<ProcessLogsResponse> {
return this.requestJson("GET", `${API_PREFIX}/processes/${encodeURIComponent(id)}/logs`, {
query,
});
}
async followProcessLogs(
id: string,
listener: ProcessLogListener,
query: ProcessLogFollowQuery = {},
): Promise<ProcessLogSubscription> {
const abortController = new AbortController();
const response = await this.requestRaw(
"GET",
`${API_PREFIX}/processes/${encodeURIComponent(id)}/logs`,
{
query: { ...query, follow: true },
accept: "text/event-stream",
signal: abortController.signal,
},
);
if (!response.body) {
abortController.abort();
throw new Error("SSE stream is not readable in this environment.");
}
const closed = consumeProcessLogSse(response.body, listener, abortController.signal);
return {
close: () => abortController.abort(),
closed,
};
}
async sendProcessInput(id: string, request: ProcessInputRequest): Promise<ProcessInputResponse> {
return this.requestJson("POST", `${API_PREFIX}/processes/${encodeURIComponent(id)}/input`, {
body: request,
});
}
async resizeProcessTerminal(
id: string,
request: ProcessTerminalResizeRequest,
): Promise<ProcessTerminalResizeResponse> {
return this.requestJson(
"POST",
`${API_PREFIX}/processes/${encodeURIComponent(id)}/terminal/resize`,
{
body: request,
},
);
}
buildProcessTerminalWebSocketUrl(
id: string,
options: ProcessTerminalWebSocketUrlOptions = {},
): string {
return toWebSocketUrl(
this.buildUrl(`${API_PREFIX}/processes/${encodeURIComponent(id)}/terminal/ws`, {
access_token: options.accessToken ?? this.token,
}),
);
}
connectProcessTerminalWebSocket(
id: string,
options: ProcessTerminalConnectOptions = {},
): WebSocket {
const WebSocketCtor = options.WebSocket ?? globalThis.WebSocket;
if (!WebSocketCtor) {
throw new Error("WebSocket API is not available; provide a WebSocket implementation.");
}
return new WebSocketCtor(
this.buildProcessTerminalWebSocketUrl(id, {
accessToken: options.accessToken,
}),
options.protocols,
);
}
private async getLiveConnection(agent: string): Promise<LiveAcpConnection> {
const existing = this.liveConnections.get(agent);
if (existing) {
@ -1068,6 +1231,17 @@ async function autoAuthenticate(acp: AcpHttpClient, methods: AuthMethod[]): Prom
}
}
function toAgentQuery(options: AgentQueryOptions | undefined): Record<string, QueryValue> | undefined {
if (!options) {
return undefined;
}
return {
config: options.config,
no_cache: options.noCache,
};
}
function normalizeSessionInit(
value: Omit<NewSessionRequest, "_meta"> | undefined,
): Omit<NewSessionRequest, "_meta"> {
@ -1230,3 +1404,93 @@ async function readProblem(response: Response): Promise<ProblemDetails | undefin
return undefined;
}
}
async function consumeProcessLogSse(
body: ReadableStream<Uint8Array>,
listener: ProcessLogListener,
signal: AbortSignal,
): Promise<void> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (!signal.aborted) {
const { done, value } = await reader.read();
if (done) {
return;
}
buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n");
let separatorIndex = buffer.indexOf("\n\n");
while (separatorIndex !== -1) {
const chunk = buffer.slice(0, separatorIndex);
buffer = buffer.slice(separatorIndex + 2);
const entry = parseProcessLogSseChunk(chunk);
if (entry) {
listener(entry);
}
separatorIndex = buffer.indexOf("\n\n");
}
}
} catch (error) {
if (signal.aborted || isAbortError(error)) {
return;
}
throw error;
} finally {
reader.releaseLock();
}
}
function parseProcessLogSseChunk(chunk: string): ProcessLogEntry | null {
if (!chunk.trim()) {
return null;
}
let eventName = "message";
const dataLines: string[] = [];
for (const line of chunk.split("\n")) {
if (!line || line.startsWith(":")) {
continue;
}
if (line.startsWith("event:")) {
eventName = line.slice(6).trim();
continue;
}
if (line.startsWith("data:")) {
dataLines.push(line.slice(5).trimStart());
}
}
if (eventName !== "log") {
return null;
}
const data = dataLines.join("\n");
if (!data.trim()) {
return null;
}
return JSON.parse(data) as ProcessLogEntry;
}
function toWebSocketUrl(url: string): string {
const parsed = new URL(url);
if (parsed.protocol === "http:") {
parsed.protocol = "ws:";
} else if (parsed.protocol === "https:") {
parsed.protocol = "wss:";
}
return parsed.toString();
}
function isAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}

View file

@ -57,6 +57,39 @@ export interface paths {
"/v1/health": {
get: operations["get_v1_health"];
};
"/v1/processes": {
get: operations["get_v1_processes"];
post: operations["post_v1_processes"];
};
"/v1/processes/config": {
get: operations["get_v1_processes_config"];
post: operations["post_v1_processes_config"];
};
"/v1/processes/run": {
post: operations["post_v1_processes_run"];
};
"/v1/processes/{id}": {
get: operations["get_v1_process"];
delete: operations["delete_v1_process"];
};
"/v1/processes/{id}/input": {
post: operations["post_v1_process_input"];
};
"/v1/processes/{id}/kill": {
post: operations["post_v1_process_kill"];
};
"/v1/processes/{id}/logs": {
get: operations["get_v1_process_logs"];
};
"/v1/processes/{id}/stop": {
post: operations["post_v1_process_stop"];
};
"/v1/processes/{id}/terminal/resize": {
post: operations["post_v1_process_terminal_resize"];
};
"/v1/processes/{id}/terminal/ws": {
get: operations["get_v1_process_terminal_ws"];
};
}
export type webhooks = Record<string, never>;
@ -230,6 +263,116 @@ export interface components {
type: string;
[key: string]: unknown;
};
ProcessConfig: {
/** Format: int64 */
defaultRunTimeoutMs: number;
maxConcurrentProcesses: number;
maxInputBytesPerRequest: number;
maxLogBytesPerProcess: number;
maxOutputBytes: number;
/** Format: int64 */
maxRunTimeoutMs: number;
};
ProcessCreateRequest: {
args?: string[];
command: string;
cwd?: string | null;
env?: {
[key: string]: string;
};
interactive?: boolean;
tty?: boolean;
};
ProcessInfo: {
args: string[];
command: string;
/** Format: int64 */
createdAtMs: number;
cwd?: string | null;
/** Format: int32 */
exitCode?: number | null;
/** Format: int64 */
exitedAtMs?: number | null;
id: string;
interactive: boolean;
/** Format: int32 */
pid?: number | null;
status: components["schemas"]["ProcessState"];
tty: boolean;
};
ProcessInputRequest: {
data: string;
encoding?: string | null;
};
ProcessInputResponse: {
bytesWritten: number;
};
ProcessListResponse: {
processes: components["schemas"]["ProcessInfo"][];
};
ProcessLogEntry: {
data: string;
encoding: string;
/** Format: int64 */
sequence: number;
stream: components["schemas"]["ProcessLogsStream"];
/** Format: int64 */
timestampMs: number;
};
ProcessLogsQuery: {
follow?: boolean | null;
/** Format: int64 */
since?: number | null;
stream?: components["schemas"]["ProcessLogsStream"] | null;
tail?: number | null;
};
ProcessLogsResponse: {
entries: components["schemas"]["ProcessLogEntry"][];
processId: string;
stream: components["schemas"]["ProcessLogsStream"];
};
/** @enum {string} */
ProcessLogsStream: "stdout" | "stderr" | "combined" | "pty";
ProcessRunRequest: {
args?: string[];
command: string;
cwd?: string | null;
env?: {
[key: string]: string;
};
maxOutputBytes?: number | null;
/** Format: int64 */
timeoutMs?: number | null;
};
ProcessRunResponse: {
/** Format: int64 */
durationMs: number;
/** Format: int32 */
exitCode?: number | null;
stderr: string;
stderrTruncated: boolean;
stdout: string;
stdoutTruncated: boolean;
timedOut: boolean;
};
ProcessSignalQuery: {
/** Format: int64 */
waitMs?: number | null;
};
/** @enum {string} */
ProcessState: "running" | "exited";
ProcessTerminalResizeRequest: {
/** Format: int32 */
cols: number;
/** Format: int32 */
rows: number;
};
ProcessTerminalResizeResponse: {
/** Format: int32 */
cols: number;
/** Format: int32 */
rows: number;
};
/** @enum {string} */
ServerStatus: "running" | "stopped";
ServerStatusInfo: {
@ -748,4 +891,417 @@ export interface operations {
};
};
};
get_v1_processes: {
responses: {
/** @description List processes */
200: {
content: {
"application/json": components["schemas"]["ProcessListResponse"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
post_v1_processes: {
requestBody: {
content: {
"application/json": components["schemas"]["ProcessCreateRequest"];
};
};
responses: {
/** @description Started process */
200: {
content: {
"application/json": components["schemas"]["ProcessInfo"];
};
};
/** @description Invalid request */
400: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process limit or state conflict */
409: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
get_v1_processes_config: {
responses: {
/** @description Current runtime process config */
200: {
content: {
"application/json": components["schemas"]["ProcessConfig"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
post_v1_processes_config: {
requestBody: {
content: {
"application/json": components["schemas"]["ProcessConfig"];
};
};
responses: {
/** @description Updated runtime process config */
200: {
content: {
"application/json": components["schemas"]["ProcessConfig"];
};
};
/** @description Invalid config */
400: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
post_v1_processes_run: {
requestBody: {
content: {
"application/json": components["schemas"]["ProcessRunRequest"];
};
};
responses: {
/** @description One-off command result */
200: {
content: {
"application/json": components["schemas"]["ProcessRunResponse"];
};
};
/** @description Invalid request */
400: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
get_v1_process: {
parameters: {
path: {
/** @description Process ID */
id: string;
};
};
responses: {
/** @description Process details */
200: {
content: {
"application/json": components["schemas"]["ProcessInfo"];
};
};
/** @description Unknown process */
404: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
delete_v1_process: {
parameters: {
path: {
/** @description Process ID */
id: string;
};
};
responses: {
/** @description Process deleted */
204: {
content: never;
};
/** @description Unknown process */
404: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process is still running */
409: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
post_v1_process_input: {
parameters: {
path: {
/** @description Process ID */
id: string;
};
};
requestBody: {
content: {
"application/json": components["schemas"]["ProcessInputRequest"];
};
};
responses: {
/** @description Input accepted */
200: {
content: {
"application/json": components["schemas"]["ProcessInputResponse"];
};
};
/** @description Invalid request */
400: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process not writable */
409: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Input exceeds configured limit */
413: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
post_v1_process_kill: {
parameters: {
query?: {
/** @description Wait up to N ms for process to exit */
waitMs?: number | null;
};
path: {
/** @description Process ID */
id: string;
};
};
responses: {
/** @description Kill signal sent */
200: {
content: {
"application/json": components["schemas"]["ProcessInfo"];
};
};
/** @description Unknown process */
404: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
get_v1_process_logs: {
parameters: {
query?: {
/** @description stdout|stderr|combined|pty */
stream?: components["schemas"]["ProcessLogsStream"] | null;
/** @description Tail N entries */
tail?: number | null;
/** @description Follow via SSE */
follow?: boolean | null;
/** @description Only entries with sequence greater than this */
since?: number | null;
};
path: {
/** @description Process ID */
id: string;
};
};
responses: {
/** @description Process logs */
200: {
content: {
"application/json": components["schemas"]["ProcessLogsResponse"];
};
};
/** @description Unknown process */
404: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
post_v1_process_stop: {
parameters: {
query?: {
/** @description Wait up to N ms for process to exit */
waitMs?: number | null;
};
path: {
/** @description Process ID */
id: string;
};
};
responses: {
/** @description Stop signal sent */
200: {
content: {
"application/json": components["schemas"]["ProcessInfo"];
};
};
/** @description Unknown process */
404: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
post_v1_process_terminal_resize: {
parameters: {
path: {
/** @description Process ID */
id: string;
};
};
requestBody: {
content: {
"application/json": components["schemas"]["ProcessTerminalResizeRequest"];
};
};
responses: {
/** @description Resize accepted */
200: {
content: {
"application/json": components["schemas"]["ProcessTerminalResizeResponse"];
};
};
/** @description Invalid request */
400: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Unknown process */
404: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Not a terminal process */
409: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
get_v1_process_terminal_ws: {
parameters: {
query?: {
/** @description Bearer token alternative for WS auth */
access_token?: string | null;
};
path: {
/** @description Process ID */
id: string;
};
};
responses: {
/** @description WebSocket upgraded */
101: {
content: never;
};
/** @description Invalid websocket frame or upgrade request */
400: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Unknown process */
404: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Not a terminal process */
409: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
/** @description Process API unsupported on this platform */
501: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
}

View file

@ -10,6 +10,12 @@ export { AcpRpcError } from "acp-http-client";
export { buildInspectorUrl } from "./inspector.ts";
export type {
AgentQueryOptions,
ProcessLogFollowQuery,
ProcessLogListener,
ProcessLogSubscription,
ProcessTerminalConnectOptions,
ProcessTerminalWebSocketUrlOptions,
SandboxAgentConnectOptions,
SandboxAgentStartOptions,
SessionCreateRequest,
@ -29,6 +35,7 @@ export type {
AcpServerInfo,
AcpServerListResponse,
AgentInfo,
AgentQuery,
AgentInstallRequest,
AgentInstallResponse,
AgentListResponse,
@ -51,6 +58,27 @@ export type {
McpConfigQuery,
McpServerConfig,
ProblemDetails,
ProcessConfig,
ProcessCreateRequest,
ProcessInfo,
ProcessInputRequest,
ProcessInputResponse,
ProcessListResponse,
ProcessLogEntry,
ProcessLogsQuery,
ProcessLogsResponse,
ProcessLogsStream,
ProcessRunRequest,
ProcessRunResponse,
ProcessSignalQuery,
ProcessState,
ProcessTerminalClientFrame,
ProcessTerminalErrorFrame,
ProcessTerminalExitFrame,
ProcessTerminalReadyFrame,
ProcessTerminalResizeRequest,
ProcessTerminalResizeResponse,
ProcessTerminalServerFrame,
SessionEvent,
SessionPersistDriver,
SessionRecord,

View file

@ -6,6 +6,7 @@ export type ProblemDetails = components["schemas"]["ProblemDetails"];
export type HealthResponse = JsonResponse<operations["get_v1_health"], 200>;
export type AgentListResponse = JsonResponse<operations["get_v1_agents"], 200>;
export type AgentInfo = components["schemas"]["AgentInfo"];
export type AgentQuery = QueryParams<operations["get_v1_agents"]>;
export type AgentInstallRequest = JsonRequestBody<operations["post_v1_agent_install"]>;
export type AgentInstallResponse = JsonResponse<operations["post_v1_agent_install"], 200>;
@ -31,6 +32,58 @@ export type McpServerConfig = components["schemas"]["McpServerConfig"];
export type SkillsConfigQuery = QueryParams<operations["get_v1_config_skills"]>;
export type SkillsConfig = components["schemas"]["SkillsConfig"];
export type ProcessConfig = JsonResponse<operations["get_v1_processes_config"], 200>;
export type ProcessCreateRequest = JsonRequestBody<operations["post_v1_processes"]>;
export type ProcessInfo = components["schemas"]["ProcessInfo"];
export type ProcessInputRequest = JsonRequestBody<operations["post_v1_process_input"]>;
export type ProcessInputResponse = JsonResponse<operations["post_v1_process_input"], 200>;
export type ProcessListResponse = JsonResponse<operations["get_v1_processes"], 200>;
export type ProcessLogEntry = components["schemas"]["ProcessLogEntry"];
export type ProcessLogsQuery = QueryParams<operations["get_v1_process_logs"]>;
export type ProcessLogsResponse = JsonResponse<operations["get_v1_process_logs"], 200>;
export type ProcessLogsStream = components["schemas"]["ProcessLogsStream"];
export type ProcessRunRequest = JsonRequestBody<operations["post_v1_processes_run"]>;
export type ProcessRunResponse = JsonResponse<operations["post_v1_processes_run"], 200>;
export type ProcessSignalQuery = QueryParams<operations["post_v1_process_stop"]>;
export type ProcessState = components["schemas"]["ProcessState"];
export type ProcessTerminalResizeRequest = JsonRequestBody<operations["post_v1_process_terminal_resize"]>;
export type ProcessTerminalResizeResponse = JsonResponse<operations["post_v1_process_terminal_resize"], 200>;
export type ProcessTerminalClientFrame =
| {
type: "input";
data: string;
encoding?: string;
}
| {
type: "resize";
cols: number;
rows: number;
}
| {
type: "close";
};
export interface ProcessTerminalReadyFrame {
type: "ready";
processId: string;
}
export interface ProcessTerminalExitFrame {
type: "exit";
exitCode?: number | null;
}
export interface ProcessTerminalErrorFrame {
type: "error";
message: string;
}
export type ProcessTerminalServerFrame =
| ProcessTerminalReadyFrame
| ProcessTerminalExitFrame
| ProcessTerminalErrorFrame;
export interface SessionRecord {
id: string;
agent: string;

View file

@ -12,6 +12,7 @@ import {
} from "../src/index.ts";
import { spawnSandboxAgent, isNodeRuntime, type SandboxAgentSpawnHandle } from "../src/spawn.ts";
import { prepareMockAgentDataHome } from "./helpers/mock-agent.ts";
import WebSocket from "ws";
const __dirname = dirname(fileURLToPath(import.meta.url));
@ -64,6 +65,107 @@ async function waitFor<T>(
throw new Error("timed out waiting for condition");
}
async function waitForAsync<T>(
fn: () => Promise<T | undefined | null>,
timeoutMs = 6000,
stepMs = 30,
): Promise<T> {
const started = Date.now();
while (Date.now() - started < timeoutMs) {
const value = await fn();
if (value !== undefined && value !== null) {
return value;
}
await sleep(stepMs);
}
throw new Error("timed out waiting for condition");
}
function buildTarArchive(entries: Array<{ name: string; content: string }>): Uint8Array {
const blocks: Buffer[] = [];
for (const entry of entries) {
const content = Buffer.from(entry.content, "utf8");
const header = Buffer.alloc(512, 0);
writeTarString(header, 0, 100, entry.name);
writeTarOctal(header, 100, 8, 0o644);
writeTarOctal(header, 108, 8, 0);
writeTarOctal(header, 116, 8, 0);
writeTarOctal(header, 124, 12, content.length);
writeTarOctal(header, 136, 12, Math.floor(Date.now() / 1000));
header.fill(0x20, 148, 156);
header[156] = "0".charCodeAt(0);
writeTarString(header, 257, 6, "ustar");
writeTarString(header, 263, 2, "00");
let checksum = 0;
for (const byte of header) {
checksum += byte;
}
writeTarChecksum(header, checksum);
blocks.push(header);
blocks.push(content);
const remainder = content.length % 512;
if (remainder !== 0) {
blocks.push(Buffer.alloc(512 - remainder, 0));
}
}
blocks.push(Buffer.alloc(1024, 0));
return Buffer.concat(blocks);
}
function writeTarString(buffer: Buffer, offset: number, length: number, value: string): void {
const bytes = Buffer.from(value, "utf8");
bytes.copy(buffer, offset, 0, Math.min(bytes.length, length));
}
function writeTarOctal(buffer: Buffer, offset: number, length: number, value: number): void {
const rendered = value.toString(8).padStart(length - 1, "0");
writeTarString(buffer, offset, length, rendered);
buffer[offset + length - 1] = 0;
}
function writeTarChecksum(buffer: Buffer, checksum: number): void {
const rendered = checksum.toString(8).padStart(6, "0");
writeTarString(buffer, 148, 6, rendered);
buffer[154] = 0;
buffer[155] = 0x20;
}
function decodeSocketPayload(data: unknown): string {
if (typeof data === "string") {
return data;
}
if (data instanceof ArrayBuffer) {
return Buffer.from(data).toString("utf8");
}
if (ArrayBuffer.isView(data)) {
return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString("utf8");
}
if (typeof Blob !== "undefined" && data instanceof Blob) {
throw new Error("Blob socket payloads are not supported in this test");
}
throw new Error(`Unsupported socket payload type: ${typeof data}`);
}
function decodeProcessLogData(data: string, encoding: string): string {
if (encoding === "base64") {
return Buffer.from(data, "base64").toString("utf8");
}
return data;
}
function nodeCommand(source: string): { command: string; args: string[] } {
return {
command: process.execPath,
args: ["-e", source],
};
}
describe("Integration: TypeScript SDK flat session API", () => {
let handle: SandboxAgentSpawnHandle;
let baseUrl: string;
@ -122,6 +224,9 @@ describe("Integration: TypeScript SDK flat session API", () => {
const fetched = await sdk.getSession(session.id);
expect(fetched?.agent).toBe("mock");
const acpServers = await sdk.listAcpServers();
expect(acpServers.servers.some((server) => server.agent === "mock")).toBe(true);
const events = await sdk.getEvents({ sessionId: session.id, limit: 100 });
expect(events.items.length).toBeGreaterThan(0);
expect(events.items.some((event) => event.sender === "client")).toBe(true);
@ -137,6 +242,64 @@ describe("Integration: TypeScript SDK flat session API", () => {
await sdk.dispose();
});
it("covers agent query flags and filesystem HTTP helpers", async () => {
const sdk = await SandboxAgent.connect({
baseUrl,
token,
});
const directory = mkdtempSync(join(tmpdir(), "sdk-fs-"));
const nestedDir = join(directory, "nested");
const filePath = join(directory, "notes.txt");
const movedPath = join(directory, "notes-moved.txt");
const uploadDir = join(directory, "uploaded");
try {
const listedAgents = await sdk.listAgents({ config: true, noCache: true });
expect(listedAgents.agents.some((agent) => agent.id === "mock")).toBe(true);
const mockAgent = await sdk.getAgent("mock", { config: true, noCache: true });
expect(mockAgent.id).toBe("mock");
expect(Array.isArray(mockAgent.configOptions)).toBe(true);
await sdk.mkdirFs({ path: nestedDir });
await sdk.writeFsFile({ path: filePath }, "hello from sdk");
const bytes = await sdk.readFsFile({ path: filePath });
expect(new TextDecoder().decode(bytes)).toBe("hello from sdk");
const stat = await sdk.statFs({ path: filePath });
expect(stat.path).toBe(filePath);
expect(stat.size).toBe(bytes.byteLength);
const entries = await sdk.listFsEntries({ path: directory });
expect(entries.some((entry) => entry.path === nestedDir)).toBe(true);
expect(entries.some((entry) => entry.path === filePath)).toBe(true);
const moved = await sdk.moveFs({
from: filePath,
to: movedPath,
overwrite: true,
});
expect(moved.to).toBe(movedPath);
const uploadResult = await sdk.uploadFsBatch(
buildTarArchive([{ name: "batch.txt", content: "batch upload works" }]),
{ path: uploadDir },
);
expect(uploadResult.paths.some((path) => path.endsWith("batch.txt"))).toBe(true);
const uploaded = await sdk.readFsFile({ path: join(uploadDir, "batch.txt") });
expect(new TextDecoder().decode(uploaded)).toBe("batch upload works");
const deleted = await sdk.deleteFsEntry({ path: movedPath });
expect(deleted.path).toBe(movedPath);
} finally {
rmSync(directory, { recursive: true, force: true });
await sdk.dispose();
}
});
it("uses custom fetch for both HTTP helpers and ACP session traffic", async () => {
const defaultFetch = globalThis.fetch;
if (!defaultFetch) {
@ -168,7 +331,7 @@ describe("Integration: TypeScript SDK flat session API", () => {
expect(seenPaths.some((path) => path.startsWith("/v1/acp/"))).toBe(true);
await sdk.dispose();
});
}, 60_000);
it("requires baseUrl when fetch is not provided", async () => {
await expect(SandboxAgent.connect({ token } as any)).rejects.toThrow(
@ -320,4 +483,186 @@ describe("Integration: TypeScript SDK flat session API", () => {
await sdk.dispose();
rmSync(directory, { recursive: true, force: true });
});
it("covers process runtime HTTP helpers, log streaming, and terminal websocket access", async () => {
const sdk = await SandboxAgent.connect({
baseUrl,
token,
});
const originalConfig = await sdk.getProcessConfig();
const updatedConfig = await sdk.setProcessConfig({
...originalConfig,
maxOutputBytes: originalConfig.maxOutputBytes + 1,
});
expect(updatedConfig.maxOutputBytes).toBe(originalConfig.maxOutputBytes + 1);
const runResult = await sdk.runProcess({
...nodeCommand("process.stdout.write('run-stdout'); process.stderr.write('run-stderr');"),
timeoutMs: 5_000,
});
expect(runResult.stdout).toContain("run-stdout");
expect(runResult.stderr).toContain("run-stderr");
let interactiveProcessId: string | undefined;
let ttyProcessId: string | undefined;
let killProcessId: string | undefined;
try {
const interactiveProcess = await sdk.createProcess({
...nodeCommand(`
process.stdin.setEncoding("utf8");
process.stdout.write("ready\\n");
process.stdin.on("data", (chunk) => {
process.stdout.write("echo:" + chunk);
});
setInterval(() => {}, 1_000);
`),
interactive: true,
});
interactiveProcessId = interactiveProcess.id;
const listed = await sdk.listProcesses();
expect(listed.processes.some((process) => process.id === interactiveProcess.id)).toBe(true);
const fetched = await sdk.getProcess(interactiveProcess.id);
expect(fetched.status).toBe("running");
const initialLogs = await waitForAsync(async () => {
const logs = await sdk.getProcessLogs(interactiveProcess.id, { tail: 10 });
return logs.entries.some((entry) => decodeProcessLogData(entry.data, entry.encoding).includes("ready"))
? logs
: undefined;
});
expect(
initialLogs.entries.some((entry) => decodeProcessLogData(entry.data, entry.encoding).includes("ready")),
).toBe(true);
const followedLogs: string[] = [];
const subscription = await sdk.followProcessLogs(
interactiveProcess.id,
(entry) => {
followedLogs.push(decodeProcessLogData(entry.data, entry.encoding));
},
{ tail: 1 },
);
try {
const inputResult = await sdk.sendProcessInput(interactiveProcess.id, {
data: Buffer.from("hello over stdin\n", "utf8").toString("base64"),
encoding: "base64",
});
expect(inputResult.bytesWritten).toBeGreaterThan(0);
await waitFor(() => {
const joined = followedLogs.join("");
return joined.includes("echo:hello over stdin") ? joined : undefined;
});
} finally {
subscription.close();
await subscription.closed;
}
const stopped = await sdk.stopProcess(interactiveProcess.id, { waitMs: 5_000 });
expect(stopped.status).toBe("exited");
await sdk.deleteProcess(interactiveProcess.id);
interactiveProcessId = undefined;
const ttyProcess = await sdk.createProcess({
...nodeCommand(`
process.stdin.setEncoding("utf8");
process.stdin.on("data", (chunk) => {
process.stdout.write(chunk);
});
setInterval(() => {}, 1_000);
`),
interactive: true,
tty: true,
});
ttyProcessId = ttyProcess.id;
const resized = await sdk.resizeProcessTerminal(ttyProcess.id, {
cols: 120,
rows: 40,
});
expect(resized.cols).toBe(120);
expect(resized.rows).toBe(40);
const wsUrl = sdk.buildProcessTerminalWebSocketUrl(ttyProcess.id);
expect(wsUrl.startsWith("ws://") || wsUrl.startsWith("wss://")).toBe(true);
const ws = sdk.connectProcessTerminalWebSocket(ttyProcess.id, {
WebSocket: WebSocket as unknown as typeof globalThis.WebSocket,
});
ws.binaryType = "arraybuffer";
const socketTextFrames: string[] = [];
const socketBinaryFrames: string[] = [];
ws.addEventListener("message", (event) => {
if (typeof event.data === "string") {
socketTextFrames.push(event.data);
return;
}
socketBinaryFrames.push(decodeSocketPayload(event.data));
});
await waitFor(() => {
const ready = socketTextFrames.find((frame) => frame.includes('"type":"ready"'));
return ready;
});
ws.send(JSON.stringify({
type: "input",
data: "hello tty\n",
}));
await waitFor(() => {
const joined = socketBinaryFrames.join("");
return joined.includes("hello tty") ? joined : undefined;
});
ws.close();
await waitForAsync(async () => {
const processInfo = await sdk.getProcess(ttyProcess.id);
return processInfo.status === "running" ? processInfo : undefined;
});
const killedTty = await sdk.killProcess(ttyProcess.id, { waitMs: 5_000 });
expect(killedTty.status).toBe("exited");
await sdk.deleteProcess(ttyProcess.id);
ttyProcessId = undefined;
const killProcess = await sdk.createProcess({
...nodeCommand("setInterval(() => {}, 1_000);"),
});
killProcessId = killProcess.id;
const killed = await sdk.killProcess(killProcess.id, { waitMs: 5_000 });
expect(killed.status).toBe("exited");
await sdk.deleteProcess(killProcess.id);
killProcessId = undefined;
} finally {
await sdk.setProcessConfig(originalConfig);
if (interactiveProcessId) {
await sdk.killProcess(interactiveProcessId, { waitMs: 5_000 }).catch(() => {});
await sdk.deleteProcess(interactiveProcessId).catch(() => {});
}
if (ttyProcessId) {
await sdk.killProcess(ttyProcessId, { waitMs: 5_000 }).catch(() => {});
await sdk.deleteProcess(ttyProcessId).catch(() => {});
}
if (killProcessId) {
await sdk.killProcess(killProcessId, { waitMs: 5_000 }).catch(() => {});
await sdk.deleteProcess(killProcessId).catch(() => {});
}
await sdk.dispose();
}
});
});

View file

@ -1,17 +1,17 @@
# Server Instructions
## ACP v2 Architecture
## Architecture
- Public API routes are defined in `server/packages/sandbox-agent/src/router.rs`.
- ACP runtime/process bridge is in `server/packages/sandbox-agent/src/acp_runtime.rs`.
- `/v2` is the only active API surface for sessions/prompts (`/v2/rpc`).
- ACP proxy runtime is in `server/packages/sandbox-agent/src/acp_proxy_runtime.rs`.
- All API endpoints are under `/v1`.
- Keep binary filesystem transfer endpoints as dedicated HTTP APIs:
- `GET /v2/fs/file`
- `PUT /v2/fs/file`
- `POST /v2/fs/upload-batch`
- `GET /v1/fs/file`
- `PUT /v1/fs/file`
- `POST /v1/fs/upload-batch`
- Rationale: host-owned cross-agent-consistent behavior and large binary transfer needs that ACP JSON-RPC is not suited to stream efficiently.
- Maintain ACP variants in parallel only when they share the same underlying filesystem implementation; SDK defaults should still prefer HTTP for large/binary transfers.
- `/v1/*` must remain hard-removed (`410`) and `/opencode/*` stays disabled (`503`) until Phase 7.
- `/opencode/*` stays disabled (`503`) until Phase 7.
- Agent install logic (native + ACP agent process + lazy install) is handled by `server/packages/agent-management/`.
## API Contract Rules
@ -23,14 +23,14 @@
## Tests
Primary v2 integration coverage:
- `server/packages/sandbox-agent/tests/v2_api.rs`
- `server/packages/sandbox-agent/tests/v2_agent_process_matrix.rs`
Primary v1 integration coverage:
- `server/packages/sandbox-agent/tests/v1_api.rs`
- `server/packages/sandbox-agent/tests/v1_agent_process_matrix.rs`
Run:
```bash
cargo test -p sandbox-agent --test v2_api
cargo test -p sandbox-agent --test v2_agent_process_matrix
cargo test -p sandbox-agent --test v1_api
cargo test -p sandbox-agent --test v1_agent_process_matrix
```
## Migration Docs Sync

View file

@ -17,6 +17,7 @@ pub enum ErrorType {
PermissionDenied,
NotAcceptable,
UnsupportedMediaType,
NotFound,
SessionNotFound,
SessionAlreadyExists,
ModeNotSupported,
@ -37,6 +38,7 @@ impl ErrorType {
Self::PermissionDenied => "urn:sandbox-agent:error:permission_denied",
Self::NotAcceptable => "urn:sandbox-agent:error:not_acceptable",
Self::UnsupportedMediaType => "urn:sandbox-agent:error:unsupported_media_type",
Self::NotFound => "urn:sandbox-agent:error:not_found",
Self::SessionNotFound => "urn:sandbox-agent:error:session_not_found",
Self::SessionAlreadyExists => "urn:sandbox-agent:error:session_already_exists",
Self::ModeNotSupported => "urn:sandbox-agent:error:mode_not_supported",
@ -57,6 +59,7 @@ impl ErrorType {
Self::PermissionDenied => "Permission Denied",
Self::NotAcceptable => "Not Acceptable",
Self::UnsupportedMediaType => "Unsupported Media Type",
Self::NotFound => "Not Found",
Self::SessionNotFound => "Session Not Found",
Self::SessionAlreadyExists => "Session Already Exists",
Self::ModeNotSupported => "Mode Not Supported",
@ -77,6 +80,7 @@ impl ErrorType {
Self::PermissionDenied => 403,
Self::NotAcceptable => 406,
Self::UnsupportedMediaType => 415,
Self::NotFound => 404,
Self::SessionNotFound => 404,
Self::SessionAlreadyExists => 409,
Self::ModeNotSupported => 400,
@ -155,6 +159,8 @@ pub enum SandboxError {
NotAcceptable { message: String },
#[error("unsupported media type: {message}")]
UnsupportedMediaType { message: String },
#[error("not found: {resource} {id}")]
NotFound { resource: String, id: String },
#[error("session not found: {session_id}")]
SessionNotFound { session_id: String },
#[error("session already exists: {session_id}")]
@ -180,6 +186,7 @@ impl SandboxError {
Self::PermissionDenied { .. } => ErrorType::PermissionDenied,
Self::NotAcceptable { .. } => ErrorType::NotAcceptable,
Self::UnsupportedMediaType { .. } => ErrorType::UnsupportedMediaType,
Self::NotFound { .. } => ErrorType::NotFound,
Self::SessionNotFound { .. } => ErrorType::SessionNotFound,
Self::SessionAlreadyExists { .. } => ErrorType::SessionAlreadyExists,
Self::ModeNotSupported { .. } => ErrorType::ModeNotSupported,
@ -264,6 +271,12 @@ impl SandboxError {
map.insert("message".to_string(), Value::String(message.clone()));
(None, None, Some(Value::Object(map)))
}
Self::NotFound { resource, id } => {
let mut map = Map::new();
map.insert("resource".to_string(), Value::String(resource.clone()));
map.insert("id".to_string(), Value::String(id.clone()));
(None, None, Some(Value::Object(map)))
}
Self::SessionNotFound { session_id } => (None, Some(session_id.clone()), None),
Self::SessionAlreadyExists { session_id } => (None, Some(session_id.clone()), None),
Self::ModeNotSupported { agent, mode } => {

View file

@ -55,6 +55,7 @@ insta.workspace = true
tower.workspace = true
tempfile.workspace = true
serial_test = "3.2"
tokio-tungstenite = "0.24"
[features]
test-utils = ["tempfile"]

View file

@ -3,6 +3,7 @@
mod acp_proxy_runtime;
pub mod cli;
pub mod daemon;
mod process_runtime;
pub mod router;
pub mod server_logs;
pub mod telemetry;

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,5 @@
use std::collections::{BTreeMap, HashMap};
use std::convert::Infallible;
use std::fs;
use std::io::Cursor;
use std::path::{Path as StdPath, PathBuf};
@ -6,6 +7,7 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use axum::body::Bytes;
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::extract::{Path, Query, State};
use axum::http::{header, HeaderMap, Request, StatusCode};
use axum::middleware::Next;
@ -13,6 +15,8 @@ use axum::response::sse::KeepAlive;
use axum::response::{IntoResponse, Response, Sse};
use axum::routing::{delete, get, post};
use axum::{Json, Router};
use futures::stream;
use futures::StreamExt;
use sandbox_agent_agent_management::agents::{
AgentId, AgentManager, InstallOptions, InstallResult, InstallSource, InstalledArtifactKind,
};
@ -27,11 +31,16 @@ use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tar::Archive;
use tokio_stream::wrappers::BroadcastStream;
use tower_http::trace::TraceLayer;
use tracing::Span;
use utoipa::{Modify, OpenApi, ToSchema};
use crate::acp_proxy_runtime::{AcpProxyRuntime, ProxyPostOutcome};
use crate::process_runtime::{
decode_input_bytes, ProcessLogFilter, ProcessLogFilterStream, ProcessRuntime,
ProcessRuntimeConfig, ProcessSnapshot, ProcessStartSpec, ProcessStatus, ProcessStream, RunSpec,
};
use crate::ui;
mod support;
@ -77,6 +86,7 @@ pub struct AppState {
agent_manager: Arc<AgentManager>,
acp_proxy: Arc<AcpProxyRuntime>,
opencode_server_manager: Arc<OpenCodeServerManager>,
process_runtime: Arc<ProcessRuntime>,
pub(crate) branding: BrandingMode,
version_cache: Mutex<HashMap<AgentId, CachedAgentVersion>>,
}
@ -100,11 +110,13 @@ impl AppState {
auto_restart: true,
},
));
let process_runtime = Arc::new(ProcessRuntime::new());
Self {
auth,
agent_manager,
acp_proxy,
opencode_server_manager,
process_runtime,
branding,
version_cache: Mutex::new(HashMap::new()),
}
@ -122,6 +134,10 @@ impl AppState {
self.opencode_server_manager.clone()
}
pub(crate) fn process_runtime(&self) -> Arc<ProcessRuntime> {
self.process_runtime.clone()
}
pub(crate) fn purge_version_cache(&self, agent: AgentId) {
self.version_cache.lock().unwrap().remove(&agent);
}
@ -166,6 +182,28 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
.route("/fs/move", post(post_v1_fs_move))
.route("/fs/stat", get(get_v1_fs_stat))
.route("/fs/upload-batch", post(post_v1_fs_upload_batch))
.route(
"/processes/config",
get(get_v1_processes_config).post(post_v1_processes_config),
)
.route("/processes", get(get_v1_processes).post(post_v1_processes))
.route("/processes/run", post(post_v1_processes_run))
.route(
"/processes/:id",
get(get_v1_process).delete(delete_v1_process),
)
.route("/processes/:id/stop", post(post_v1_process_stop))
.route("/processes/:id/kill", post(post_v1_process_kill))
.route("/processes/:id/logs", get(get_v1_process_logs))
.route("/processes/:id/input", post(post_v1_process_input))
.route(
"/processes/:id/terminal/resize",
post(post_v1_process_terminal_resize),
)
.route(
"/processes/:id/terminal/ws",
get(get_v1_process_terminal_ws),
)
.route(
"/config/mcp",
get(get_v1_config_mcp)
@ -295,6 +333,19 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
post_v1_fs_move,
get_v1_fs_stat,
post_v1_fs_upload_batch,
get_v1_processes_config,
post_v1_processes_config,
post_v1_processes,
post_v1_processes_run,
get_v1_processes,
get_v1_process,
post_v1_process_stop,
post_v1_process_kill,
delete_v1_process,
get_v1_process_logs,
post_v1_process_input,
post_v1_process_terminal_resize,
get_v1_process_terminal_ws,
get_v1_config_mcp,
put_v1_config_mcp,
delete_v1_config_mcp,
@ -329,6 +380,22 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
FsMoveResponse,
FsActionResponse,
FsUploadBatchResponse,
ProcessConfig,
ProcessCreateRequest,
ProcessRunRequest,
ProcessRunResponse,
ProcessState,
ProcessInfo,
ProcessListResponse,
ProcessLogsStream,
ProcessLogsQuery,
ProcessLogEntry,
ProcessLogsResponse,
ProcessInputRequest,
ProcessInputResponse,
ProcessSignalQuery,
ProcessTerminalResizeRequest,
ProcessTerminalResizeResponse,
AcpPostQuery,
AcpServerInfo,
AcpServerListResponse,
@ -361,12 +428,21 @@ impl Modify for ServerAddon {
pub enum ApiError {
#[error(transparent)]
Sandbox(#[from] SandboxError),
#[error("problem: {0:?}")]
Problem(ProblemDetails),
}
impl From<ProblemDetails> for ApiError {
fn from(value: ProblemDetails) -> Self {
Self::Problem(value)
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let problem = match &self {
ApiError::Sandbox(error) => problem_from_sandbox_error(error),
ApiError::Problem(problem) => problem.clone(),
};
let status =
StatusCode::from_u16(problem.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
@ -1075,6 +1151,678 @@ async fn post_v1_fs_upload_batch(
}))
}
/// Get process runtime configuration.
///
/// Returns the current runtime configuration for the process management API,
/// including limits for concurrency, timeouts, and buffer sizes.
#[utoipa::path(
get,
path = "/v1/processes/config",
tag = "v1",
responses(
(status = 200, description = "Current runtime process config", body = ProcessConfig),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_processes_config(
State(state): State<Arc<AppState>>,
) -> Result<Json<ProcessConfig>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let config = state.process_runtime().get_config().await;
Ok(Json(map_process_config(config)))
}
/// Update process runtime configuration.
///
/// Replaces the runtime configuration for the process management API.
/// Validates that all values are non-zero and clamps default timeout to max.
#[utoipa::path(
post,
path = "/v1/processes/config",
tag = "v1",
request_body = ProcessConfig,
responses(
(status = 200, description = "Updated runtime process config", body = ProcessConfig),
(status = 400, description = "Invalid config", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_processes_config(
State(state): State<Arc<AppState>>,
Json(body): Json<ProcessConfig>,
) -> Result<Json<ProcessConfig>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let updated = runtime
.set_config(into_runtime_process_config(body))
.await?;
Ok(Json(map_process_config(updated)))
}
/// Create a long-lived managed process.
///
/// Spawns a new process with the given command and arguments. Supports both
/// pipe-based and PTY (tty) modes. Returns the process descriptor on success.
#[utoipa::path(
post,
path = "/v1/processes",
tag = "v1",
request_body = ProcessCreateRequest,
responses(
(status = 200, description = "Started process", body = ProcessInfo),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 409, description = "Process limit or state conflict", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_processes(
State(state): State<Arc<AppState>>,
Json(body): Json<ProcessCreateRequest>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let snapshot = runtime
.start_process(ProcessStartSpec {
command: body.command,
args: body.args,
cwd: body.cwd,
env: body.env.into_iter().collect(),
tty: body.tty,
interactive: body.interactive,
})
.await?;
Ok(Json(map_process_snapshot(snapshot)))
}
/// Run a one-shot command.
///
/// Executes a command to completion and returns its stdout, stderr, exit code,
/// and duration. Supports configurable timeout and output size limits.
#[utoipa::path(
post,
path = "/v1/processes/run",
tag = "v1",
request_body = ProcessRunRequest,
responses(
(status = 200, description = "One-off command result", body = ProcessRunResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_processes_run(
State(state): State<Arc<AppState>>,
Json(body): Json<ProcessRunRequest>,
) -> Result<Json<ProcessRunResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let output = runtime
.run_once(RunSpec {
command: body.command,
args: body.args,
cwd: body.cwd,
env: body.env.into_iter().collect(),
timeout_ms: body.timeout_ms,
max_output_bytes: body.max_output_bytes,
})
.await?;
Ok(Json(ProcessRunResponse {
exit_code: output.exit_code,
timed_out: output.timed_out,
stdout: output.stdout,
stderr: output.stderr,
stdout_truncated: output.stdout_truncated,
stderr_truncated: output.stderr_truncated,
duration_ms: output.duration_ms,
}))
}
/// List all managed processes.
///
/// Returns a list of all processes (running and exited) currently tracked
/// by the runtime, sorted by process ID.
#[utoipa::path(
get,
path = "/v1/processes",
tag = "v1",
responses(
(status = 200, description = "List processes", body = ProcessListResponse),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_processes(
State(state): State<Arc<AppState>>,
) -> Result<Json<ProcessListResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshots = state.process_runtime().list_processes().await;
Ok(Json(ProcessListResponse {
processes: snapshots.into_iter().map(map_process_snapshot).collect(),
}))
}
/// Get a single process by ID.
///
/// Returns the current state of a managed process including its status,
/// PID, exit code, and creation/exit timestamps.
#[utoipa::path(
get,
path = "/v1/processes/{id}",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
responses(
(status = 200, description = "Process details", body = ProcessInfo),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshot = state.process_runtime().snapshot(&id).await?;
Ok(Json(map_process_snapshot(snapshot)))
}
/// Send SIGTERM to a process.
///
/// Sends SIGTERM to the process and optionally waits up to `waitMs`
/// milliseconds for the process to exit before returning.
#[utoipa::path(
post,
path = "/v1/processes/{id}/stop",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("waitMs" = Option<u64>, Query, description = "Wait up to N ms for process to exit")
),
responses(
(status = 200, description = "Stop signal sent", body = ProcessInfo),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_stop(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(query): Query<ProcessSignalQuery>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshot = state
.process_runtime()
.stop_process(&id, query.wait_ms)
.await?;
Ok(Json(map_process_snapshot(snapshot)))
}
/// Send SIGKILL to a process.
///
/// Sends SIGKILL to the process and optionally waits up to `waitMs`
/// milliseconds for the process to exit before returning.
#[utoipa::path(
post,
path = "/v1/processes/{id}/kill",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("waitMs" = Option<u64>, Query, description = "Wait up to N ms for process to exit")
),
responses(
(status = 200, description = "Kill signal sent", body = ProcessInfo),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_kill(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(query): Query<ProcessSignalQuery>,
) -> Result<Json<ProcessInfo>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let snapshot = state
.process_runtime()
.kill_process(&id, query.wait_ms)
.await?;
Ok(Json(map_process_snapshot(snapshot)))
}
/// Delete a process record.
///
/// Removes a stopped process from the runtime. Returns 409 if the process
/// is still running; stop or kill it first.
#[utoipa::path(
delete,
path = "/v1/processes/{id}",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
responses(
(status = 204, description = "Process deleted"),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Process is still running", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn delete_v1_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<StatusCode, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
state.process_runtime().delete_process(&id).await?;
Ok(StatusCode::NO_CONTENT)
}
/// Fetch process logs.
///
/// Returns buffered log entries for a process. Supports filtering by stream
/// type, tail count, and sequence-based resumption. When `follow=true`,
/// returns an SSE stream that replays buffered entries then streams live output.
#[utoipa::path(
get,
path = "/v1/processes/{id}/logs",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("stream" = Option<ProcessLogsStream>, Query, description = "stdout|stderr|combined|pty"),
("tail" = Option<usize>, Query, description = "Tail N entries"),
("follow" = Option<bool>, Query, description = "Follow via SSE"),
("since" = Option<u64>, Query, description = "Only entries with sequence greater than this")
),
responses(
(status = 200, description = "Process logs", body = ProcessLogsResponse),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_process_logs(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
headers: HeaderMap,
Query(query): Query<ProcessLogsQuery>,
) -> Result<Response, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
let default_stream = if runtime.is_tty(&id).await? {
ProcessLogsStream::Pty
} else {
ProcessLogsStream::Combined
};
let requested_stream = query.stream.unwrap_or(default_stream);
let since = match (query.since, parse_last_event_id(&headers)?) {
(Some(query_since), Some(last_event_id)) => Some(query_since.max(last_event_id)),
(Some(query_since), None) => Some(query_since),
(None, Some(last_event_id)) => Some(last_event_id),
(None, None) => None,
};
let filter = ProcessLogFilter {
stream: into_runtime_log_stream(requested_stream),
tail: query.tail,
since,
};
let entries = runtime.logs(&id, filter).await?;
let response_entries: Vec<ProcessLogEntry> =
entries.iter().cloned().map(map_process_log_line).collect();
if query.follow.unwrap_or(false) {
let rx = runtime.subscribe_logs(&id).await?;
let replay_stream = stream::iter(response_entries.into_iter().map(|entry| {
Ok::<axum::response::sse::Event, Infallible>(
axum::response::sse::Event::default()
.event("log")
.id(entry.sequence.to_string())
.data(serde_json::to_string(&entry).unwrap_or_else(|_| "{}".to_string())),
)
}));
let requested_stream_copy = requested_stream;
let follow_stream = BroadcastStream::new(rx).filter_map(move |item| {
let requested_stream_copy = requested_stream_copy;
async move {
match item {
Ok(line) => {
let entry = map_process_log_line(line);
if process_log_matches(&entry, requested_stream_copy) {
Some(Ok(axum::response::sse::Event::default()
.event("log")
.id(entry.sequence.to_string())
.data(
serde_json::to_string(&entry)
.unwrap_or_else(|_| "{}".to_string()),
)))
} else {
None
}
}
Err(_) => None,
}
}
});
let stream = replay_stream.chain(follow_stream);
let response =
Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)));
return Ok(response.into_response());
}
Ok(Json(ProcessLogsResponse {
process_id: id,
stream: requested_stream,
entries: response_entries,
})
.into_response())
}
/// Write input to a process.
///
/// Sends data to a process's stdin (pipe mode) or PTY writer (tty mode).
/// Data can be encoded as base64, utf8, or text. Returns 413 if the decoded
/// payload exceeds the configured `maxInputBytesPerRequest` limit.
#[utoipa::path(
post,
path = "/v1/processes/{id}/input",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
request_body = ProcessInputRequest,
responses(
(status = 200, description = "Input accepted", body = ProcessInputResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 413, description = "Input exceeds configured limit", body = ProblemDetails),
(status = 409, description = "Process not writable", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_input(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(body): Json<ProcessInputRequest>,
) -> Result<Json<ProcessInputResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let encoding = body.encoding.unwrap_or_else(|| "base64".to_string());
let input = decode_input_bytes(&body.data, &encoding)?;
let runtime = state.process_runtime();
let max_input = runtime.max_input_bytes().await;
if input.len() > max_input {
return Err(SandboxError::InvalidRequest {
message: format!("input payload exceeds maxInputBytesPerRequest ({max_input})"),
}
.into());
}
let bytes_written = runtime.write_input(&id, &input).await?;
Ok(Json(ProcessInputResponse { bytes_written }))
}
/// Resize a process terminal.
///
/// Sets the PTY window size (columns and rows) for a tty-mode process and
/// sends SIGWINCH so the child process can adapt.
#[utoipa::path(
post,
path = "/v1/processes/{id}/terminal/resize",
tag = "v1",
params(
("id" = String, Path, description = "Process ID")
),
request_body = ProcessTerminalResizeRequest,
responses(
(status = 200, description = "Resize accepted", body = ProcessTerminalResizeResponse),
(status = 400, description = "Invalid request", body = ProblemDetails),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Not a terminal process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn post_v1_process_terminal_resize(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Json(body): Json<ProcessTerminalResizeRequest>,
) -> Result<Json<ProcessTerminalResizeResponse>, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
state
.process_runtime()
.resize_terminal(&id, body.cols, body.rows)
.await?;
Ok(Json(ProcessTerminalResizeResponse {
cols: body.cols,
rows: body.rows,
}))
}
/// Open an interactive WebSocket terminal session.
///
/// Upgrades the connection to a WebSocket for bidirectional PTY I/O. Accepts
/// `access_token` query param for browser-based auth (WebSocket API cannot
/// send custom headers). Streams raw PTY output as binary frames and accepts
/// JSON control frames for input, resize, and close.
#[utoipa::path(
get,
path = "/v1/processes/{id}/terminal/ws",
tag = "v1",
params(
("id" = String, Path, description = "Process ID"),
("access_token" = Option<String>, Query, description = "Bearer token alternative for WS auth")
),
responses(
(status = 101, description = "WebSocket upgraded"),
(status = 400, description = "Invalid websocket frame or upgrade request", body = ProblemDetails),
(status = 404, description = "Unknown process", body = ProblemDetails),
(status = 409, description = "Not a terminal process", body = ProblemDetails),
(status = 501, description = "Process API unsupported on this platform", body = ProblemDetails)
)
)]
async fn get_v1_process_terminal_ws(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(_query): Query<ProcessWsQuery>,
ws: WebSocketUpgrade,
) -> Result<Response, ApiError> {
if !process_api_supported() {
return Err(process_api_not_supported().into());
}
let runtime = state.process_runtime();
if !runtime.is_tty(&id).await? {
return Err(SandboxError::Conflict {
message: "process is not running in tty mode".to_string(),
}
.into());
}
Ok(ws
.on_upgrade(move |socket| process_terminal_ws_session(socket, runtime, id))
.into_response())
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
enum TerminalClientFrame {
Input {
data: String,
#[serde(default)]
encoding: Option<String>,
},
Resize {
cols: u16,
rows: u16,
},
Close,
}
async fn process_terminal_ws_session(
mut socket: WebSocket,
runtime: Arc<ProcessRuntime>,
id: String,
) {
let _ = send_ws_json(
&mut socket,
json!({
"type": "ready",
"processId": &id,
}),
)
.await;
let mut log_rx = match runtime.subscribe_logs(&id).await {
Ok(rx) => rx,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
let _ = socket.close().await;
return;
}
};
let mut exit_poll = tokio::time::interval(Duration::from_millis(150));
loop {
tokio::select! {
ws_in = socket.recv() => {
match ws_in {
Some(Ok(Message::Binary(_))) => {
let _ = send_ws_error(&mut socket, "binary input is not supported; use text JSON frames").await;
}
Some(Ok(Message::Text(text))) => {
let parsed = serde_json::from_str::<TerminalClientFrame>(&text);
match parsed {
Ok(TerminalClientFrame::Input { data, encoding }) => {
let input = match decode_input_bytes(&data, encoding.as_deref().unwrap_or("utf8")) {
Ok(input) => input,
Err(err) => {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
continue;
}
};
let max_input = runtime.max_input_bytes().await;
if input.len() > max_input {
let _ = send_ws_error(&mut socket, &format!("input payload exceeds maxInputBytesPerRequest ({max_input})")).await;
continue;
}
if let Err(err) = runtime.write_input(&id, &input).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
}
}
Ok(TerminalClientFrame::Resize { cols, rows }) => {
if let Err(err) = runtime.resize_terminal(&id, cols, rows).await {
let _ = send_ws_error(&mut socket, &err.to_string()).await;
}
}
Ok(TerminalClientFrame::Close) => {
let _ = socket.close().await;
break;
}
Err(err) => {
let _ = send_ws_error(&mut socket, &format!("invalid terminal frame: {err}")).await;
}
}
}
Some(Ok(Message::Ping(payload))) => {
let _ = socket.send(Message::Pong(payload)).await;
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Pong(_))) => {}
Some(Err(_)) => break,
}
}
log_in = log_rx.recv() => {
match log_in {
Ok(line) => {
if line.stream != ProcessStream::Pty {
continue;
}
let bytes = {
use base64::engine::general_purpose::STANDARD as BASE64_ENGINE;
use base64::Engine;
BASE64_ENGINE.decode(&line.data).unwrap_or_default()
};
if socket.send(Message::Binary(bytes)).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
_ = exit_poll.tick() => {
if let Ok(snapshot) = runtime.snapshot(&id).await {
if snapshot.status == ProcessStatus::Exited {
let _ = send_ws_json(
&mut socket,
json!({
"type": "exit",
"exitCode": snapshot.exit_code,
}),
)
.await;
let _ = socket.close().await;
break;
}
}
}
}
}
}
async fn send_ws_json(socket: &mut WebSocket, payload: Value) -> Result<(), ()> {
socket
.send(Message::Text(
serde_json::to_string(&payload).map_err(|_| ())?,
))
.await
.map_err(|_| ())
}
async fn send_ws_error(socket: &mut WebSocket, message: &str) -> Result<(), ()> {
send_ws_json(
socket,
json!({
"type": "error",
"message": message,
}),
)
.await
}
#[utoipa::path(
get,
path = "/v1/config/mcp",
@ -1386,6 +2134,96 @@ async fn delete_v1_acp(
Ok(StatusCode::NO_CONTENT)
}
fn process_api_supported() -> bool {
!cfg!(windows)
}
fn process_api_not_supported() -> ProblemDetails {
ProblemDetails {
type_: ErrorType::InvalidRequest.as_urn().to_string(),
title: "Not Implemented".to_string(),
status: 501,
detail: Some("process API is not implemented on Windows".to_string()),
instance: None,
extensions: serde_json::Map::new(),
}
}
fn map_process_config(config: ProcessRuntimeConfig) -> ProcessConfig {
ProcessConfig {
max_concurrent_processes: config.max_concurrent_processes,
default_run_timeout_ms: config.default_run_timeout_ms,
max_run_timeout_ms: config.max_run_timeout_ms,
max_output_bytes: config.max_output_bytes,
max_log_bytes_per_process: config.max_log_bytes_per_process,
max_input_bytes_per_request: config.max_input_bytes_per_request,
}
}
fn into_runtime_process_config(config: ProcessConfig) -> ProcessRuntimeConfig {
ProcessRuntimeConfig {
max_concurrent_processes: config.max_concurrent_processes,
default_run_timeout_ms: config.default_run_timeout_ms,
max_run_timeout_ms: config.max_run_timeout_ms,
max_output_bytes: config.max_output_bytes,
max_log_bytes_per_process: config.max_log_bytes_per_process,
max_input_bytes_per_request: config.max_input_bytes_per_request,
}
}
fn map_process_snapshot(snapshot: ProcessSnapshot) -> ProcessInfo {
ProcessInfo {
id: snapshot.id,
command: snapshot.command,
args: snapshot.args,
cwd: snapshot.cwd,
tty: snapshot.tty,
interactive: snapshot.interactive,
status: match snapshot.status {
ProcessStatus::Running => ProcessState::Running,
ProcessStatus::Exited => ProcessState::Exited,
},
pid: snapshot.pid,
exit_code: snapshot.exit_code,
created_at_ms: snapshot.created_at_ms,
exited_at_ms: snapshot.exited_at_ms,
}
}
fn into_runtime_log_stream(stream: ProcessLogsStream) -> ProcessLogFilterStream {
match stream {
ProcessLogsStream::Stdout => ProcessLogFilterStream::Stdout,
ProcessLogsStream::Stderr => ProcessLogFilterStream::Stderr,
ProcessLogsStream::Combined => ProcessLogFilterStream::Combined,
ProcessLogsStream::Pty => ProcessLogFilterStream::Pty,
}
}
fn map_process_log_line(line: crate::process_runtime::ProcessLogLine) -> ProcessLogEntry {
ProcessLogEntry {
sequence: line.sequence,
stream: match line.stream {
ProcessStream::Stdout => ProcessLogsStream::Stdout,
ProcessStream::Stderr => ProcessLogsStream::Stderr,
ProcessStream::Pty => ProcessLogsStream::Pty,
},
timestamp_ms: line.timestamp_ms,
data: line.data,
encoding: line.encoding.to_string(),
}
}
fn process_log_matches(entry: &ProcessLogEntry, stream: ProcessLogsStream) -> bool {
match stream {
ProcessLogsStream::Stdout => entry.stream == ProcessLogsStream::Stdout,
ProcessLogsStream::Stderr => entry.stream == ProcessLogsStream::Stderr,
ProcessLogsStream::Combined => {
entry.stream == ProcessLogsStream::Stdout || entry.stream == ProcessLogsStream::Stderr
}
ProcessLogsStream::Pty => entry.stream == ProcessLogsStream::Pty,
}
}
fn validate_named_query(value: &str, field_name: &str) -> Result<(), SandboxError> {
if value.trim().is_empty() {
return Err(SandboxError::InvalidRequest {

View file

@ -33,7 +33,17 @@ pub(super) async fn require_token(
.and_then(|value| value.to_str().ok())
.and_then(|value| value.strip_prefix("Bearer "));
if bearer == Some(expected.as_str()) {
let allow_query_token = request.uri().path().ends_with("/terminal/ws");
let query_token = if allow_query_token {
request
.uri()
.query()
.and_then(|query| query_param(query, "access_token"))
} else {
None
};
if bearer == Some(expected.as_str()) || query_token.as_deref() == Some(expected.as_str()) {
return Ok(next.run(request).await);
}
@ -42,6 +52,53 @@ pub(super) async fn require_token(
}))
}
fn query_param(query: &str, key: &str) -> Option<String> {
query
.split('&')
.filter_map(|part| part.split_once('='))
.find_map(|(k, v)| {
if k == key {
Some(percent_decode(v))
} else {
None
}
})
}
fn percent_decode(input: &str) -> String {
let mut output = Vec::with_capacity(input.len());
let bytes = input.as_bytes();
let mut i = 0;
while i < bytes.len() {
if bytes[i] == b'%' && i + 2 < bytes.len() {
if let (Some(hi), Some(lo)) = (
hex_nibble(bytes[i + 1]),
hex_nibble(bytes[i + 2]),
) {
output.push((hi << 4) | lo);
i += 3;
continue;
}
}
if bytes[i] == b'+' {
output.push(b' ');
} else {
output.push(bytes[i]);
}
i += 1;
}
String::from_utf8(output).unwrap_or_else(|_| input.to_string())
}
fn hex_nibble(b: u8) -> Option<u8> {
match b {
b'0'..=b'9' => Some(b - b'0'),
b'a'..=b'f' => Some(b - b'a' + 10),
b'A'..=b'F' => Some(b - b'A' + 10),
_ => None,
}
}
pub(super) type PinBoxSseStream = crate::acp_proxy_runtime::PinBoxSseStream;
pub(super) fn credentials_available_for(
@ -497,8 +554,17 @@ pub(super) fn problem_from_sandbox_error(error: &SandboxError) -> ProblemDetails
let mut problem = error.to_problem_details();
match error {
SandboxError::InvalidRequest { .. } => {
problem.status = 400;
SandboxError::InvalidRequest { message } => {
if message.starts_with("input payload exceeds maxInputBytesPerRequest") {
problem.status = 413;
problem.title = "Payload Too Large".to_string();
} else {
problem.status = 400;
}
}
SandboxError::NotFound { .. } => {
problem.status = 404;
problem.title = "Not Found".to_string();
}
SandboxError::Timeout { .. } => {
problem.status = 504;

View file

@ -362,3 +362,173 @@ pub struct AcpEnvelope {
#[serde(default)]
pub error: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessConfig {
pub max_concurrent_processes: usize,
pub default_run_timeout_ms: u64,
pub max_run_timeout_ms: u64,
pub max_output_bytes: usize,
pub max_log_bytes_per_process: usize,
pub max_input_bytes_per_request: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessCreateRequest {
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub env: BTreeMap<String, String>,
#[serde(default)]
pub tty: bool,
#[serde(default)]
pub interactive: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessRunRequest {
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub env: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timeout_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_output_bytes: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessRunResponse {
pub exit_code: Option<i32>,
pub timed_out: bool,
pub stdout: String,
pub stderr: String,
pub stdout_truncated: bool,
pub stderr_truncated: bool,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessState {
Running,
Exited,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInfo {
pub id: String,
pub command: String,
pub args: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
pub tty: bool,
pub interactive: bool,
pub status: ProcessState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pid: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exit_code: Option<i32>,
pub created_at_ms: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub exited_at_ms: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessListResponse {
pub processes: Vec<ProcessInfo>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, ToSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProcessLogsStream {
Stdout,
Stderr,
Combined,
Pty,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogsQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stream: Option<ProcessLogsStream>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tail: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub follow: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub since: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogEntry {
pub sequence: u64,
pub stream: ProcessLogsStream,
pub timestamp_ms: i64,
pub data: String,
pub encoding: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogsResponse {
pub process_id: String,
pub stream: ProcessLogsStream,
pub entries: Vec<ProcessLogEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInputRequest {
pub data: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub encoding: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInputResponse {
pub bytes_written: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessSignalQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wait_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeRequest {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessTerminalResizeResponse {
pub cols: u16,
pub rows: u16,
}
#[derive(Debug, Clone, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessWsQuery {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub access_token: Option<String>,
}

View file

@ -1,6 +1,6 @@
use std::fs;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::path::Path;
use std::time::Duration;
@ -14,6 +14,8 @@ use sandbox_agent_agent_management::agents::AgentManager;
use serde_json::{json, Value};
use serial_test::serial;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tower::util::ServiceExt;
struct TestApp {
@ -48,6 +50,56 @@ struct EnvVarGuard {
previous: Option<std::ffi::OsString>,
}
struct LiveServer {
address: SocketAddr,
shutdown_tx: Option<oneshot::Sender<()>>,
task: JoinHandle<()>,
}
impl LiveServer {
async fn spawn(app: Router) -> Self {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind live server");
let address = listener.local_addr().expect("live server address");
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let task = tokio::spawn(async move {
let server = axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(async {
let _ = shutdown_rx.await;
});
let _ = server.await;
});
Self {
address,
shutdown_tx: Some(shutdown_tx),
task,
}
}
fn http_url(&self, path: &str) -> String {
format!("http://{}{}", self.address, path)
}
fn ws_url(&self, path: &str) -> String {
format!("ws://{}{}", self.address, path)
}
async fn shutdown(mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
let _ = tokio::time::timeout(Duration::from_secs(3), async {
let _ = self.task.await;
})
.await;
}
}
impl EnvVarGuard {
fn set(key: &'static str, value: &str) -> Self {
let previous = std::env::var_os(key);
@ -291,3 +343,5 @@ mod acp_transport;
mod config_endpoints;
#[path = "v1_api/control_plane.rs"]
mod control_plane;
#[path = "v1_api/processes.rs"]
mod processes;

View file

@ -0,0 +1,661 @@
use super::*;
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::Engine;
use futures::{SinkExt, StreamExt};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
async fn wait_for_exited(test_app: &TestApp, process_id: &str) {
for _ in 0..30 {
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
if parsed["status"] == "exited" {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("process did not exit in time");
}
fn decode_log_entries(entries: &[Value]) -> String {
entries
.iter()
.filter_map(|entry| entry.get("data").and_then(Value::as_str))
.filter_map(|encoded| BASE64.decode(encoded).ok())
.map(|bytes| String::from_utf8_lossy(&bytes).to_string())
.collect::<Vec<_>>()
.join("")
}
async fn recv_ws_message(
ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
) -> Message {
tokio::time::timeout(Duration::from_secs(3), ws.next())
.await
.expect("timed out waiting for websocket frame")
.expect("websocket stream ended")
.expect("websocket frame")
}
#[tokio::test]
async fn v1_processes_config_round_trip() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes/config",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["maxConcurrentProcesses"], 64);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/config",
Some(json!({
"maxConcurrentProcesses": 8,
"defaultRunTimeoutMs": 1000,
"maxRunTimeoutMs": 5000,
"maxOutputBytes": 4096,
"maxLogBytesPerProcess": 32768,
"maxInputBytesPerRequest": 1024
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["maxConcurrentProcesses"], 8);
assert_eq!(parsed["defaultRunTimeoutMs"], 1000);
}
#[tokio::test]
async fn v1_process_lifecycle_requires_stop_before_delete() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "sleep 30"],
"tty": false,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, body) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::CONFLICT);
assert_eq!(parse_json(&body)["status"], 409);
let (status, _, _body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/stop"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
wait_for_exited(&test_app, &process_id).await;
let (status, _, _) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn v1_process_run_returns_output_and_timeout() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", "echo hi"],
"timeoutMs": 1000
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["timedOut"], false);
assert_eq!(parsed["exitCode"], 0);
assert!(parsed["stdout"].as_str().unwrap_or_default().contains("hi"));
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", "sleep 2"],
"timeoutMs": 50
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(parse_json(&body)["timedOut"], true);
}
#[tokio::test]
async fn v1_process_run_reports_truncation() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/run",
Some(json!({
"command": "sh",
"args": ["-lc", "printf 'abcdefghijklmnopqrstuvwxyz'"],
"maxOutputBytes": 5
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let parsed = parse_json(&body);
assert_eq!(parsed["stdoutTruncated"], true);
assert_eq!(parsed["stderrTruncated"], false);
assert_eq!(parsed["stdout"].as_str().unwrap_or_default().len(), 5);
}
#[tokio::test]
async fn v1_process_tty_input_and_logs() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "cat",
"tty": true,
"interactive": true
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, _body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/input"),
Some(json!({
"data": "aGVsbG8K",
"encoding": "base64"
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
tokio::time::sleep(Duration::from_millis(150)).await;
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/logs?stream=pty&tail=20"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let entries = parse_json(&body)["entries"]
.as_array()
.cloned()
.unwrap_or_default();
assert!(!entries.is_empty());
let (status, _, _body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/kill"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
wait_for_exited(&test_app, &process_id).await;
let (status, _, _) = send_request(
&test_app.app,
Method::DELETE,
&format!("/v1/processes/{process_id}"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT);
}
#[tokio::test]
async fn v1_process_not_found_returns_404() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
"/v1/processes/does-not-exist",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_eq!(parse_json(&body)["status"], 404);
}
#[tokio::test]
async fn v1_process_input_limit_returns_413() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, _) = send_request(
&test_app.app,
Method::POST,
"/v1/processes/config",
Some(json!({
"maxConcurrentProcesses": 8,
"defaultRunTimeoutMs": 1000,
"maxRunTimeoutMs": 5000,
"maxOutputBytes": 4096,
"maxLogBytesPerProcess": 32768,
"maxInputBytesPerRequest": 4
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "cat",
"tty": true,
"interactive": true
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
&format!("/v1/processes/{process_id}/input"),
Some(json!({
"data": "aGVsbG8=",
"encoding": "base64"
})),
&[],
)
.await;
assert_eq!(status, StatusCode::PAYLOAD_TOO_LARGE);
assert_eq!(parse_json(&body)["status"], 413);
}
#[tokio::test]
async fn v1_tty_process_is_real_terminal() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "tty"],
"tty": true,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
wait_for_exited(&test_app, &process_id).await;
let (status, _, body) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/logs?stream=pty"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let entries = parse_json(&body)["entries"]
.as_array()
.cloned()
.unwrap_or_default();
let joined = decode_log_entries(&entries);
assert!(!joined.to_lowercase().contains("not a tty"));
assert!(joined.contains("/dev/"));
}
#[tokio::test]
async fn v1_process_logs_follow_sse_streams_entries() {
let test_app = TestApp::new(AuthConfig::disabled());
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "sh",
"args": ["-lc", "echo first; sleep 0.3; echo second"],
"tty": false,
"interactive": false
})),
&[],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let request = Request::builder()
.method(Method::GET)
.uri(format!(
"/v1/processes/{process_id}/logs?stream=stdout&follow=true"
))
.body(Body::empty())
.expect("build request");
let response = test_app
.app
.clone()
.oneshot(request)
.await
.expect("sse response");
assert_eq!(response.status(), StatusCode::OK);
let mut stream = response.into_body().into_data_stream();
let chunk = tokio::time::timeout(Duration::from_secs(5), async move {
while let Some(chunk) = stream.next().await {
let bytes = chunk.expect("stream chunk");
let text = String::from_utf8_lossy(&bytes).to_string();
if text.contains("data:") {
return text;
}
}
panic!("SSE stream ended before log chunk");
})
.await
.expect("timed out reading process log sse");
let payload = parse_sse_data(&chunk);
assert!(payload["sequence"].as_u64().is_some());
assert_eq!(payload["stream"], "stdout");
}
#[tokio::test]
async fn v1_access_token_query_only_allows_terminal_ws() {
let test_app = TestApp::new(AuthConfig::with_token("secret-token".to_string()));
let (status, _, _) = send_request(
&test_app.app,
Method::GET,
"/v1/health?access_token=secret-token",
None,
&[],
)
.await;
assert_eq!(status, StatusCode::UNAUTHORIZED);
let (status, _, body) = send_request(
&test_app.app,
Method::POST,
"/v1/processes",
Some(json!({
"command": "cat",
"tty": true,
"interactive": true
})),
&[("authorization", "Bearer secret-token")],
)
.await;
assert_eq!(status, StatusCode::OK);
let process_id = parse_json(&body)["id"]
.as_str()
.expect("process id")
.to_string();
let (status, _, _) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/terminal/ws"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::UNAUTHORIZED);
let (status, _, _) = send_request(
&test_app.app,
Method::GET,
&format!("/v1/processes/{process_id}/terminal/ws?access_token=secret-token"),
None,
&[],
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn v1_process_terminal_ws_e2e_is_deterministic() {
let test_app = TestApp::new(AuthConfig::disabled());
let live_server = LiveServer::spawn(test_app.app.clone()).await;
let http = reqwest::Client::new();
let create_response = http
.post(live_server.http_url("/v1/processes"))
.json(&json!({
"command": "sh",
"args": ["-lc", "stty -echo; IFS= read -r line; printf 'got:%s\\n' \"$line\""],
"tty": true,
"interactive": true
}))
.send()
.await
.expect("create process response");
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
let create_body: Value = create_response.json().await.expect("create process json");
let process_id = create_body["id"]
.as_str()
.expect("process id")
.to_string();
let ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
let (mut ws, _) = connect_async(&ws_url)
.await
.expect("connect websocket");
let ready = recv_ws_message(&mut ws).await;
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
.expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
ws.send(Message::Text(
json!({
"type": "input",
"data": "hello from ws\n"
})
.to_string(),
))
.await
.expect("send input frame");
let mut saw_binary_output = false;
let mut saw_exit = false;
for _ in 0..10 {
let frame = recv_ws_message(&mut ws).await;
match frame {
Message::Binary(bytes) => {
let text = String::from_utf8_lossy(&bytes);
if text.contains("got:hello from ws") {
saw_binary_output = true;
}
}
Message::Text(text) => {
let payload: Value = serde_json::from_str(&text).expect("ws json");
if payload["type"] == "exit" {
saw_exit = true;
break;
}
assert_ne!(payload["type"], "error");
}
Message::Close(_) => break,
Message::Ping(_) | Message::Pong(_) => {}
_ => {}
}
}
assert!(saw_binary_output, "expected pty binary output over websocket");
assert!(saw_exit, "expected exit control frame over websocket");
let _ = ws.close(None).await;
let delete_response = http
.delete(live_server.http_url(&format!("/v1/processes/{process_id}")))
.send()
.await
.expect("delete process response");
assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT);
live_server.shutdown().await;
}
#[tokio::test]
async fn v1_process_terminal_ws_auth_e2e() {
let token = "secret-token";
let test_app = TestApp::new(AuthConfig::with_token(token.to_string()));
let live_server = LiveServer::spawn(test_app.app.clone()).await;
let http = reqwest::Client::new();
let create_response = http
.post(live_server.http_url("/v1/processes"))
.bearer_auth(token)
.json(&json!({
"command": "cat",
"tty": true,
"interactive": true
}))
.send()
.await
.expect("create process response");
assert_eq!(create_response.status(), reqwest::StatusCode::OK);
let create_body: Value = create_response.json().await.expect("create process json");
let process_id = create_body["id"]
.as_str()
.expect("process id")
.to_string();
let unauth_ws_url = live_server.ws_url(&format!("/v1/processes/{process_id}/terminal/ws"));
let unauth_err = connect_async(&unauth_ws_url)
.await
.expect_err("unauthenticated websocket handshake should fail");
match unauth_err {
tokio_tungstenite::tungstenite::Error::Http(response) => {
assert_eq!(response.status().as_u16(), 401);
}
other => panic!("unexpected websocket auth error: {other:?}"),
}
let auth_ws_url = live_server.ws_url(&format!(
"/v1/processes/{process_id}/terminal/ws?access_token={token}"
));
let (mut ws, _) = connect_async(&auth_ws_url)
.await
.expect("authenticated websocket handshake");
let ready = recv_ws_message(&mut ws).await;
let ready_payload: Value = serde_json::from_str(ready.to_text().expect("ready text frame"))
.expect("ready json");
assert_eq!(ready_payload["type"], "ready");
assert_eq!(ready_payload["processId"], process_id);
let _ = ws
.send(Message::Text(json!({ "type": "close" }).to_string()))
.await;
let _ = ws.close(None).await;
let kill_response = http
.post(live_server.http_url(&format!(
"/v1/processes/{process_id}/kill?waitMs=1000"
)))
.bearer_auth(token)
.send()
.await
.expect("kill process response");
assert_eq!(kill_response.status(), reqwest::StatusCode::OK);
let delete_response = http
.delete(live_server.http_url(&format!("/v1/processes/{process_id}")))
.bearer_auth(token)
.send()
.await
.expect("delete process response");
assert_eq!(delete_response.status(), reqwest::StatusCode::NO_CONTENT);
live_server.shutdown().await;
}