feat: implement PTY lifecycle with streaming IO endpoints

This commit is contained in:
Nathan Flurry 2026-02-04 14:33:52 -08:00
parent 7378abee46
commit dc6cc4fcbb
11 changed files with 583 additions and 74 deletions

1
.turbo Symbolic link
View file

@ -0,0 +1 @@
/home/nathan/sandbox-agent/.turbo

1
dist Symbolic link
View file

@ -0,0 +1 @@
/home/nathan/sandbox-agent/dist

1
node_modules Symbolic link
View file

@ -0,0 +1 @@
/home/nathan/sandbox-agent/node_modules

View file

@ -36,6 +36,7 @@ tracing-logfmt.workspace = true
tracing-subscriber.workspace = true
include_dir.workspace = true
base64.workspace = true
portable-pty = "0.8"
tempfile = { workspace = true, optional = true }
[target.'cfg(unix)'.dependencies]

View file

@ -3,6 +3,7 @@
mod agent_server_logs;
pub mod credentials;
pub mod opencode_compat;
pub mod pty;
pub mod router;
pub mod server_logs;
pub mod telemetry;

View file

@ -10,19 +10,20 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::str::FromStr;
use axum::extract::{Path, Query, State};
use axum::extract::{ws::{Message, WebSocket, WebSocketUpgrade}, Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::sse::{Event, KeepAlive};
use axum::response::{IntoResponse, Sse};
use axum::routing::{get, patch, post, put};
use axum::{Json, Router};
use futures::stream;
use futures::{stream, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::sync::{broadcast, Mutex};
use tokio::time::interval;
use utoipa::{IntoParams, OpenApi, ToSchema};
use crate::pty::{PtyConnection, PtyInfo, PtySpawnRequest, PtyUpdateRequest};
use crate::router::{AppState, CreateSessionRequest, PermissionReply};
use sandbox_agent_error::SandboxError;
use sandbox_agent_agent_management::agents::AgentId;
@ -125,31 +126,6 @@ struct OpenCodeMessageRecord {
parts: Vec<Value>,
}
#[derive(Clone, Debug)]
struct OpenCodePtyRecord {
id: String,
title: String,
command: String,
args: Vec<String>,
cwd: String,
status: String,
pid: i64,
}
impl OpenCodePtyRecord {
fn to_value(&self) -> Value {
json!({
"id": self.id,
"title": self.title,
"command": self.command,
"args": self.args,
"cwd": self.cwd,
"status": self.status,
"pid": self.pid,
})
}
}
#[derive(Clone, Debug)]
struct OpenCodePermissionRecord {
id: String,
@ -219,7 +195,6 @@ pub struct OpenCodeState {
default_project_id: String,
sessions: Mutex<HashMap<String, OpenCodeSessionRecord>>,
messages: Mutex<HashMap<String, Vec<OpenCodeMessageRecord>>>,
ptys: Mutex<HashMap<String, OpenCodePtyRecord>>,
permissions: Mutex<HashMap<String, OpenCodePermissionRecord>>,
questions: Mutex<HashMap<String, OpenCodeQuestionRecord>>,
session_runtime: Mutex<HashMap<String, OpenCodeSessionRuntime>>,
@ -236,7 +211,6 @@ impl OpenCodeState {
default_project_id: project_id,
sessions: Mutex::new(HashMap::new()),
messages: Mutex::new(HashMap::new()),
ptys: Mutex::new(HashMap::new()),
permissions: Mutex::new(HashMap::new()),
questions: Mutex::new(HashMap::new()),
session_runtime: Mutex::new(HashMap::new()),
@ -568,6 +542,7 @@ struct PtyCreateRequest {
args: Option<Vec<String>>,
cwd: Option<String>,
title: Option<String>,
env: Option<HashMap<String, String>>,
}
fn next_id(prefix: &str, counter: &AtomicU64) -> String {
@ -1075,6 +1050,18 @@ fn build_file_part_from_path(
Value::Object(map)
}
fn pty_value(info: &PtyInfo) -> Value {
json!({
"id": info.id,
"title": info.title,
"command": info.command,
"args": info.args,
"cwd": info.cwd,
"status": info.status.as_str(),
"pid": info.pid,
})
}
fn session_event(event_type: &str, session: &Value) -> Value {
json!({
"type": event_type,
@ -3614,6 +3601,65 @@ async fn oc_auth_remove(Path(_provider_id): Path<String>) -> impl IntoResponse {
bool_ok(true)
}
fn spawn_pty_exit_listener(state: Arc<OpenCodeAppState>, pty_id: String) {
tokio::spawn(async move {
let mut exit_rx = match state
.inner
.session_manager()
.pty_manager()
.subscribe_exit(&pty_id)
.await
{
Some(receiver) => receiver,
None => return,
};
loop {
match exit_rx.recv().await {
Ok(exit) => {
state.opencode.emit_event(json!({
"type": "pty.exited",
"properties": {"id": exit.id, "exitCode": exit.exit_code}
}));
break;
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
}
async fn handle_pty_socket(mut socket: WebSocket, mut connection: PtyConnection) {
loop {
tokio::select! {
incoming = socket.recv() => {
match incoming {
Some(Ok(Message::Text(text))) => {
let _ = connection.input.send(text.into_bytes()).await;
}
Some(Ok(Message::Binary(bytes))) => {
let _ = connection.input.send(bytes).await;
}
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(_)) => {}
Some(Err(_)) => break,
}
}
outgoing = connection.output.recv() => {
match outgoing {
Ok(bytes) => {
if socket.send(Message::Binary(bytes)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
}
#[utoipa::path(
get,
path = "/pty",
@ -3621,8 +3667,15 @@ async fn oc_auth_remove(Path(_provider_id): Path<String>) -> impl IntoResponse {
tag = "opencode"
)]
async fn oc_pty_list(State(state): State<Arc<OpenCodeAppState>>) -> impl IntoResponse {
let ptys = state.opencode.ptys.lock().await;
let values: Vec<Value> = ptys.values().map(|p| p.to_value()).collect();
let values: Vec<Value> = state
.inner
.session_manager()
.pty_manager()
.list()
.await
.iter()
.map(pty_value)
.collect();
(StatusCode::OK, Json(json!(values)))
}
@ -3641,25 +3694,38 @@ async fn oc_pty_create(
) -> impl IntoResponse {
let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
let id = next_id("pty_", &PTY_COUNTER);
let record = OpenCodePtyRecord {
id: id.clone(),
title: body.title.unwrap_or_else(|| "PTY".to_string()),
command: body.command.unwrap_or_else(|| "bash".to_string()),
args: body.args.unwrap_or_default(),
cwd: body.cwd.unwrap_or_else(|| directory),
status: "running".to_string(),
pid: 0,
};
let value = record.to_value();
let mut ptys = state.opencode.ptys.lock().await;
ptys.insert(id, record);
drop(ptys);
let spawn = state
.inner
.session_manager()
.pty_manager()
.spawn(PtySpawnRequest {
id: id.clone(),
title: body.title.unwrap_or_else(|| "PTY".to_string()),
command: body.command.unwrap_or_else(|| "bash".to_string()),
args: body.args.unwrap_or_default(),
cwd: body.cwd.unwrap_or_else(|| directory),
env: body.env,
owner_session_id: headers
.get("x-opencode-session-id")
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string()),
})
.await;
state
.opencode
.emit_event(json!({"type": "pty.created", "properties": {"pty": value}}));
(StatusCode::OK, Json(value))
match spawn {
Ok(info) => {
let value = pty_value(&info);
state
.opencode
.emit_event(json!({"type": "pty.created", "properties": {"info": value}}));
spawn_pty_exit_listener(state.clone(), info.id.clone());
(StatusCode::OK, Json(value)).into_response()
}
Err(SandboxError::InvalidRequest { message }) => {
bad_request(&message).into_response()
}
Err(err) => internal_error(&err.to_string()).into_response(),
}
}
#[utoipa::path(
@ -3673,9 +3739,14 @@ async fn oc_pty_get(
State(state): State<Arc<OpenCodeAppState>>,
Path(pty_id): Path<String>,
) -> impl IntoResponse {
let ptys = state.opencode.ptys.lock().await;
if let Some(pty) = ptys.get(&pty_id) {
return (StatusCode::OK, Json(pty.to_value())).into_response();
if let Some(pty) = state
.inner
.session_manager()
.pty_manager()
.get(&pty_id)
.await
{
return (StatusCode::OK, Json(pty_value(&pty))).into_response();
}
not_found("PTY not found").into_response()
}
@ -3693,24 +3764,23 @@ async fn oc_pty_update(
Path(pty_id): Path<String>,
Json(body): Json<PtyCreateRequest>,
) -> impl IntoResponse {
let mut ptys = state.opencode.ptys.lock().await;
if let Some(pty) = ptys.get_mut(&pty_id) {
if let Some(title) = body.title {
pty.title = title;
}
if let Some(command) = body.command {
pty.command = command;
}
if let Some(args) = body.args {
pty.args = args;
}
if let Some(cwd) = body.cwd {
pty.cwd = cwd;
}
let value = pty.to_value();
let update = PtyUpdateRequest {
title: body.title,
command: body.command,
args: body.args,
cwd: body.cwd,
};
if let Some(pty) = state
.inner
.session_manager()
.pty_manager()
.update(&pty_id, update)
.await
{
let value = pty_value(&pty);
state
.opencode
.emit_event(json!({"type": "pty.updated", "properties": {"pty": value}}));
.emit_event(json!({"type": "pty.updated", "properties": {"info": value}}));
return (StatusCode::OK, Json(value)).into_response();
}
not_found("PTY not found").into_response()
@ -3727,11 +3797,17 @@ async fn oc_pty_delete(
State(state): State<Arc<OpenCodeAppState>>,
Path(pty_id): Path<String>,
) -> impl IntoResponse {
let mut ptys = state.opencode.ptys.lock().await;
if let Some(pty) = ptys.remove(&pty_id) {
if state
.inner
.session_manager()
.pty_manager()
.remove(&pty_id)
.await
.is_some()
{
state
.opencode
.emit_event(json!({"type": "pty.deleted", "properties": {"pty": pty.to_value()}}));
.emit_event(json!({"type": "pty.deleted", "properties": {"id": pty_id}}));
return bool_ok(true).into_response();
}
not_found("PTY not found").into_response()
@ -3744,8 +3820,26 @@ async fn oc_pty_delete(
responses((status = 200)),
tag = "opencode"
)]
async fn oc_pty_connect(Path(_pty_id): Path<String>) -> impl IntoResponse {
bool_ok(true)
async fn oc_pty_connect(
State(state): State<Arc<OpenCodeAppState>>,
Path(pty_id): Path<String>,
ws: Option<WebSocketUpgrade>,
) -> impl IntoResponse {
let connection = state
.inner
.session_manager()
.pty_manager()
.connect(&pty_id)
.await;
let Some(connection) = connection else {
return not_found("PTY not found").into_response();
};
if let Some(ws) = ws {
ws.on_upgrade(|socket| handle_pty_socket(socket, connection))
.into_response()
} else {
bool_ok(true).into_response()
}
}
#[utoipa::path(

View file

@ -0,0 +1,295 @@
use std::collections::HashMap;
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use portable_pty::{native_pty_system, CommandBuilder, PtySize};
use tokio::sync::{broadcast, mpsc, Mutex as AsyncMutex};
use sandbox_agent_error::SandboxError;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum PtyStatus {
Running,
Exited,
}
impl PtyStatus {
pub fn as_str(&self) -> &'static str {
match self {
PtyStatus::Running => "running",
PtyStatus::Exited => "exited",
}
}
}
#[derive(Clone, Debug)]
pub struct PtyInfo {
pub id: String,
pub title: String,
pub command: String,
pub args: Vec<String>,
pub cwd: String,
pub status: PtyStatus,
pub pid: i64,
pub exit_code: Option<i32>,
pub owner_session_id: Option<String>,
}
#[derive(Clone, Debug)]
pub struct PtySpawnRequest {
pub id: String,
pub title: String,
pub command: String,
pub args: Vec<String>,
pub cwd: String,
pub env: Option<HashMap<String, String>>,
pub owner_session_id: Option<String>,
}
#[derive(Clone, Debug, Default)]
pub struct PtyUpdateRequest {
pub title: Option<String>,
pub command: Option<String>,
pub args: Option<Vec<String>>,
pub cwd: Option<String>,
}
#[derive(Clone, Debug)]
pub struct PtyExit {
pub id: String,
pub exit_code: i32,
}
#[derive(Clone)]
pub struct PtyConnection {
pub output: broadcast::Receiver<Vec<u8>>,
pub input: mpsc::Sender<Vec<u8>>,
}
struct PtyInstance {
info: Mutex<PtyInfo>,
output_tx: broadcast::Sender<Vec<u8>>,
input_tx: mpsc::Sender<Vec<u8>>,
exit_tx: broadcast::Sender<PtyExit>,
}
#[derive(Default)]
pub struct PtyManager {
ptys: AsyncMutex<HashMap<String, Arc<PtyInstance>>>,
}
impl PtyManager {
pub fn new() -> Self {
Self {
ptys: AsyncMutex::new(HashMap::new()),
}
}
pub async fn spawn(&self, request: PtySpawnRequest) -> Result<PtyInfo, SandboxError> {
if request.command.trim().is_empty() {
return Err(SandboxError::InvalidRequest {
message: "command is required".to_string(),
});
}
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.map_err(|err| SandboxError::InvalidRequest {
message: format!("failed to open PTY: {err}"),
})?;
let mut cmd = CommandBuilder::new(&request.command);
cmd.args(&request.args);
cmd.cwd(&request.cwd);
if let Some(env) = &request.env {
for (key, value) in env {
cmd.env(key, value);
}
}
let child = pair
.slave
.spawn_command(cmd)
.map_err(|err| SandboxError::InvalidRequest {
message: format!("failed to spawn PTY command: {err}"),
})?;
let pid = child.process_id().unwrap_or(0) as i64;
let (output_tx, _) = broadcast::channel(512);
let (exit_tx, _) = broadcast::channel(8);
let (input_tx, mut input_rx) = mpsc::channel(256);
let info = PtyInfo {
id: request.id.clone(),
title: request.title.clone(),
command: request.command.clone(),
args: request.args.clone(),
cwd: request.cwd.clone(),
status: PtyStatus::Running,
pid,
exit_code: None,
owner_session_id: request.owner_session_id.clone(),
};
let instance = Arc::new(PtyInstance {
info: Mutex::new(info.clone()),
output_tx,
input_tx: input_tx.clone(),
exit_tx,
});
let mut ptys = self.ptys.lock().await;
ptys.insert(request.id.clone(), instance.clone());
drop(ptys);
let output_tx = instance.output_tx.clone();
let mut reader = pair
.master
.try_clone_reader()
.map_err(|err| SandboxError::InvalidRequest {
message: format!("failed to clone PTY reader: {err}"),
})?;
tokio::task::spawn_blocking(move || {
let mut buffer = [0u8; 8192];
loop {
match reader.read(&mut buffer) {
Ok(0) => break,
Ok(count) => {
let _ = output_tx.send(buffer[..count].to_vec());
}
Err(_) => break,
}
}
});
let mut writer = pair
.master
.take_writer()
.map_err(|err| SandboxError::InvalidRequest {
message: format!("failed to take PTY writer: {err}"),
})?;
tokio::task::spawn_blocking(move || {
while let Some(payload) = input_rx.blocking_recv() {
if writer.write_all(&payload).is_err() {
break;
}
if writer.flush().is_err() {
break;
}
}
});
let exit_tx = instance.exit_tx.clone();
let info_ref = Arc::clone(&instance);
tokio::task::spawn_blocking(move || {
let exit_code = child
.wait()
.ok()
.and_then(|status| status.exit_code().map(|code| code as i32));
let mut info = info_ref.info.lock().expect("pty info lock");
info.status = PtyStatus::Exited;
info.exit_code = exit_code;
let code = exit_code.unwrap_or(-1);
let _ = exit_tx.send(PtyExit {
id: info.id.clone(),
exit_code: code,
});
});
Ok(info)
}
pub async fn list(&self) -> Vec<PtyInfo> {
let ptys = self.ptys.lock().await;
ptys.values()
.map(|pty| pty.info.lock().expect("pty info lock").clone())
.collect()
}
pub async fn get(&self, pty_id: &str) -> Option<PtyInfo> {
let ptys = self.ptys.lock().await;
ptys.get(pty_id)
.map(|pty| pty.info.lock().expect("pty info lock").clone())
}
pub async fn update(&self, pty_id: &str, update: PtyUpdateRequest) -> Option<PtyInfo> {
let ptys = self.ptys.lock().await;
let pty = ptys.get(pty_id)?;
let mut info = pty.info.lock().expect("pty info lock");
if let Some(title) = update.title {
info.title = title;
}
if let Some(command) = update.command {
info.command = command;
}
if let Some(args) = update.args {
info.args = args;
}
if let Some(cwd) = update.cwd {
info.cwd = cwd;
}
Some(info.clone())
}
pub async fn remove(&self, pty_id: &str) -> Option<PtyInfo> {
let mut ptys = self.ptys.lock().await;
let pty = ptys.remove(pty_id)?;
let info = pty.info.lock().expect("pty info lock").clone();
terminate_process(info.pid);
Some(info)
}
pub async fn connect(&self, pty_id: &str) -> Option<PtyConnection> {
let ptys = self.ptys.lock().await;
let pty = ptys.get(pty_id)?.clone();
Some(PtyConnection {
output: pty.output_tx.subscribe(),
input: pty.input_tx.clone(),
})
}
pub async fn subscribe_exit(&self, pty_id: &str) -> Option<broadcast::Receiver<PtyExit>> {
let ptys = self.ptys.lock().await;
let pty = ptys.get(pty_id)?.clone();
Some(pty.exit_tx.subscribe())
}
pub async fn cleanup_for_session(&self, session_id: &str) {
let ids = {
let ptys = self.ptys.lock().await;
ptys.values()
.filter(|pty| {
pty.info
.lock()
.expect("pty info lock")
.owner_session_id
.as_deref()
== Some(session_id)
})
.map(|pty| pty.info.lock().expect("pty info lock").id.clone())
.collect::<Vec<_>>()
};
for id in ids {
let _ = self.remove(&id).await;
}
}
}
#[cfg(unix)]
fn terminate_process(pid: i64) {
if pid <= 0 {
return;
}
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
#[cfg(not(unix))]
fn terminate_process(_pid: i64) {}

View file

@ -40,6 +40,7 @@ use utoipa::{Modify, OpenApi, ToSchema};
use crate::agent_server_logs::AgentServerLogs;
use crate::opencode_compat::{build_opencode_router, OpenCodeAppState};
use crate::pty::PtyManager;
use crate::ui;
use sandbox_agent_agent_management::agents::{
AgentError as ManagerError, AgentId, AgentManager, InstallOptions, SpawnOptions, StreamingSpawn,
@ -818,6 +819,7 @@ pub(crate) struct SessionManager {
sessions: Mutex<Vec<SessionState>>,
server_manager: Arc<AgentServerManager>,
http_client: Client,
pty_manager: Arc<PtyManager>,
}
/// Shared Codex app-server process that handles multiple sessions via JSON-RPC.
@ -1538,9 +1540,14 @@ impl SessionManager {
sessions: Mutex::new(Vec::new()),
server_manager,
http_client: Client::new(),
pty_manager: Arc::new(PtyManager::new()),
}
}
pub(crate) fn pty_manager(&self) -> Arc<PtyManager> {
self.pty_manager.clone()
}
fn session_ref<'a>(sessions: &'a [SessionState], session_id: &str) -> Option<&'a SessionState> {
sessions
.iter()
@ -1840,6 +1847,7 @@ impl SessionManager {
.unregister_session(agent, &session_id, native_session_id.as_deref())
.await;
}
self.pty_manager.cleanup_for_session(&session_id).await;
Ok(())
}

View file

@ -9,8 +9,10 @@
},
"devDependencies": {
"@types/node": "^22.0.0",
"@types/ws": "^8.5.12",
"typescript": "^5.7.0",
"vitest": "^3.0.0"
"vitest": "^3.0.0",
"ws": "^8.18.0"
},
"dependencies": {
"@opencode-ai/sdk": "^1.1.21"

View file

@ -0,0 +1,104 @@
/**
* Tests for OpenCode-compatible PTY endpoints.
*/
import { describe, it, expect, beforeAll, afterEach, beforeEach } from "vitest";
import WebSocket from "ws";
import { createOpencodeClient, type OpencodeClient } from "@opencode-ai/sdk";
import { spawnSandboxAgent, buildSandboxAgent, type SandboxAgentHandle } from "./helpers/spawn";
const waitForOpen = (socket: WebSocket) =>
new Promise<void>((resolve, reject) => {
socket.once("open", () => resolve());
socket.once("error", (err) => reject(err));
});
const waitForMessage = (socket: WebSocket, predicate: (text: string) => boolean, timeoutMs = 5000) =>
new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
socket.off("message", onMessage);
reject(new Error("Timed out waiting for PTY output"));
}, timeoutMs);
const onMessage = (data: WebSocket.RawData) => {
const text = typeof data === "string" ? data : data.toString("utf8");
if (predicate(text)) {
clearTimeout(timer);
socket.off("message", onMessage);
resolve(text);
}
};
socket.on("message", onMessage);
});
describe("OpenCode-compatible PTY API", () => {
let handle: SandboxAgentHandle;
let client: OpencodeClient;
beforeAll(async () => {
await buildSandboxAgent();
});
beforeEach(async () => {
handle = await spawnSandboxAgent({ opencodeCompat: true });
client = createOpencodeClient({
baseUrl: `${handle.baseUrl}/opencode`,
headers: { Authorization: `Bearer ${handle.token}` },
});
});
afterEach(async () => {
await handle?.dispose();
});
it("should create/list/get/update/delete PTYs", async () => {
const created = await client.pty.create({
body: { command: "cat", title: "Echo" },
});
const ptyId = created.data?.id;
expect(ptyId).toBeDefined();
const list = await client.pty.list();
expect(list.data?.some((pty) => pty.id === ptyId)).toBe(true);
const fetched = await client.pty.get({ path: { ptyID: ptyId! } });
expect(fetched.data?.id).toBe(ptyId);
await client.pty.update({
path: { ptyID: ptyId! },
body: { title: "Updated" },
});
const updated = await client.pty.get({ path: { ptyID: ptyId! } });
expect(updated.data?.title).toBe("Updated");
await client.pty.remove({ path: { ptyID: ptyId! } });
const deleted = await client.pty.get({ path: { ptyID: ptyId! } });
expect(deleted.error).toBeDefined();
});
it("should stream PTY output and accept input", async () => {
const created = await client.pty.create({
body: { command: "cat" },
});
const ptyId = created.data?.id;
expect(ptyId).toBeDefined();
const wsUrl = new URL(`/opencode/pty/${ptyId}/connect`, handle.baseUrl);
wsUrl.protocol = wsUrl.protocol === "https:" ? "wss:" : "ws:";
const socket = new WebSocket(wsUrl.toString(), {
headers: { Authorization: `Bearer ${handle.token}` },
});
await waitForOpen(socket);
socket.send("hello\n");
const output = await waitForMessage(socket, (text) => text.includes("hello"));
expect(output).toContain("hello");
socket.close();
});
});

1
target Symbolic link
View file

@ -0,0 +1 @@
/home/nathan/sandbox-agent/target