feat: implement opencode command and shell execution

This commit is contained in:
Nathan Flurry 2026-02-04 14:28:55 -08:00
parent 7378abee46
commit 668744d6ec
7 changed files with 661 additions and 24 deletions

1
.turbo Symbolic link
View file

@ -0,0 +1 @@
/home/nathan/sandbox-agent/.turbo

1
dist Symbolic link
View file

@ -0,0 +1 @@
/home/nathan/sandbox-agent/dist

1
node_modules Symbolic link
View file

@ -0,0 +1 @@
/home/nathan/sandbox-agent/node_modules

View file

@ -6,6 +6,7 @@
use std::collections::HashMap;
use std::convert::Infallible;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::str::FromStr;
@ -23,7 +24,7 @@ use tokio::sync::{broadcast, Mutex};
use tokio::time::interval;
use utoipa::{IntoParams, OpenApi, ToSchema};
use crate::router::{AppState, CreateSessionRequest, PermissionReply};
use crate::router::{AppState, CommandExecutionRequest, CreateSessionRequest, PermissionReply};
use sandbox_agent_error::SandboxError;
use sandbox_agent_agent_management::agents::AgentId;
use sandbox_agent_universal_agent_schema::{
@ -942,6 +943,25 @@ fn normalize_part(session_id: &str, message_id: &str, input: &Value) -> Value {
}
}
fn split_arguments(arguments: &str) -> Vec<String> {
arguments
.split_whitespace()
.filter(|value| !value.is_empty())
.map(|value| value.to_string())
.collect()
}
fn combine_output(stdout: &str, stderr: &str) -> String {
let mut combined = stdout.to_string();
if !stderr.is_empty() {
if !combined.is_empty() {
combined.push('\n');
}
combined.push_str(stderr);
}
combined
}
fn message_id_for_sequence(sequence: u64) -> String {
format!("msg_{:020}", sequence)
}
@ -2234,7 +2254,23 @@ async fn oc_agent_list(State(state): State<Arc<OpenCodeAppState>>) -> impl IntoR
tag = "opencode"
)]
async fn oc_command_list() -> impl IntoResponse {
(StatusCode::OK, Json(json!([])))
let commands = vec![
json!({
"name": "command",
"description": "Run a command with arguments",
"source": "command",
"template": "{{command}} {{arguments}}",
"hints": ["command", "arguments"],
}),
json!({
"name": "shell",
"description": "Run a shell command",
"source": "command",
"template": "{{command}}",
"hints": ["command"],
}),
];
(StatusCode::OK, Json(json!(commands)))
}
#[utoipa::path(
@ -3162,27 +3198,111 @@ async fn oc_session_command(
return bad_request("command and arguments are required").into_response();
}
let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
let _ = state
.opencode
.ensure_session(&session_id, directory.clone())
.await;
let worktree = state.opencode.worktree_for(&directory);
let now = state.opencode.now_ms();
let assistant_message_id = next_id("msg_", &MESSAGE_COUNTER);
let agent = normalize_agent_mode(body.agent.clone());
let agent_mode = normalize_agent_mode(body.agent.clone());
let requested_model = body.model.as_deref();
let (session_agent, provider_id, model_id) = resolve_session_agent(
&state,
&session_id,
Some(OPENCODE_PROVIDER_ID),
requested_model,
)
.await;
state.opencode.emit_event(json!({
"type": "session.status",
"properties": {
"sessionID": session_id,
"status": {"type": "busy"}
}
}));
let _ = state
.opencode
.update_runtime(&session_id, |runtime| {
runtime.last_agent = Some(agent_mode.clone());
runtime.last_model_provider = Some(provider_id.clone());
runtime.last_model_id = Some(model_id.clone());
})
.await;
if let Err(err) = ensure_backing_session(&state, &session_id, &session_agent).await {
return sandbox_error_response(err).into_response();
}
ensure_session_stream(state.clone(), session_id.clone()).await;
let command = body.command.unwrap_or_default();
let args = split_arguments(body.arguments.as_deref().unwrap_or(""));
let result = match state
.inner
.session_manager()
.execute_command(
session_id.clone(),
CommandExecutionRequest {
command,
args,
cwd: Some(PathBuf::from(directory.clone())),
env: None,
timeout_ms: None,
stream_output: true,
shell: false,
},
)
.await
{
Ok(result) => result,
Err(err) => return sandbox_error_response(err).into_response(),
};
let mut parent_id: Option<String> = None;
let runtime = state
.opencode
.update_runtime(&session_id, |runtime| {
parent_id = runtime.last_user_message_id.clone();
})
.await;
let message_id = runtime
.message_id_for_item
.get(&result.item_id)
.cloned()
.unwrap_or_else(|| unique_assistant_message_id(&runtime, parent_id.as_ref(), result.sequence));
let _ = state
.opencode
.update_runtime(&session_id, |runtime| {
runtime
.message_id_for_item
.insert(result.item_id.clone(), message_id.clone());
})
.await;
let assistant_message = build_assistant_message(
&session_id,
&assistant_message_id,
"msg_parent",
&message_id,
parent_id.as_deref().unwrap_or(""),
now,
&directory,
&worktree,
&agent,
OPENCODE_PROVIDER_ID,
OPENCODE_DEFAULT_MODEL_ID,
&agent_mode,
&provider_id,
&model_id,
);
let output_text = combine_output(&result.stdout, &result.stderr);
let parts = if output_text.is_empty() {
Vec::new()
} else {
vec![build_text_part(&session_id, &message_id, &output_text)]
};
(
StatusCode::OK,
Json(json!({
"info": assistant_message,
"parts": [],
"parts": parts,
})),
)
.into_response()
@ -3207,27 +3327,101 @@ async fn oc_session_shell(
return bad_request("agent and command are required").into_response();
}
let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
let _ = state
.opencode
.ensure_session(&session_id, directory.clone())
.await;
let worktree = state.opencode.worktree_for(&directory);
let now = state.opencode.now_ms();
let assistant_message_id = next_id("msg_", &MESSAGE_COUNTER);
let agent_mode = normalize_agent_mode(body.agent.clone());
let requested_provider = body
.model
.as_ref()
.and_then(|v| v.get("providerID"))
.and_then(|v| v.as_str());
let requested_model = body
.model
.as_ref()
.and_then(|v| v.get("modelID"))
.and_then(|v| v.as_str());
let (session_agent, provider_id, model_id) =
resolve_session_agent(&state, &session_id, requested_provider, requested_model).await;
state.opencode.emit_event(json!({
"type": "session.status",
"properties": {
"sessionID": session_id,
"status": {"type": "busy"}
}
}));
let _ = state
.opencode
.update_runtime(&session_id, |runtime| {
runtime.last_agent = Some(agent_mode.clone());
runtime.last_model_provider = Some(provider_id.clone());
runtime.last_model_id = Some(model_id.clone());
})
.await;
if let Err(err) = ensure_backing_session(&state, &session_id, &session_agent).await {
return sandbox_error_response(err).into_response();
}
ensure_session_stream(state.clone(), session_id.clone()).await;
let command = body.command.unwrap_or_default();
let result = match state
.inner
.session_manager()
.execute_command(
session_id.clone(),
CommandExecutionRequest {
command,
args: Vec::new(),
cwd: Some(PathBuf::from(directory.clone())),
env: None,
timeout_ms: None,
stream_output: true,
shell: true,
},
)
.await
{
Ok(result) => result,
Err(err) => return sandbox_error_response(err).into_response(),
};
let mut parent_id: Option<String> = None;
let runtime = state
.opencode
.update_runtime(&session_id, |runtime| {
parent_id = runtime.last_user_message_id.clone();
})
.await;
let message_id = runtime
.message_id_for_item
.get(&result.item_id)
.cloned()
.unwrap_or_else(|| unique_assistant_message_id(&runtime, parent_id.as_ref(), result.sequence));
let _ = state
.opencode
.update_runtime(&session_id, |runtime| {
runtime
.message_id_for_item
.insert(result.item_id.clone(), message_id.clone());
})
.await;
let assistant_message = build_assistant_message(
&session_id,
&assistant_message_id,
"msg_parent",
&message_id,
parent_id.as_deref().unwrap_or(""),
now,
&directory,
&worktree,
&normalize_agent_mode(body.agent.clone()),
body.model
.as_ref()
.and_then(|v| v.get("providerID"))
.and_then(|v| v.as_str())
.unwrap_or(OPENCODE_PROVIDER_ID),
body.model
.as_ref()
.and_then(|v| v.get("modelID"))
.and_then(|v| v.as_str())
.unwrap_or(OPENCODE_DEFAULT_MODEL_ID),
&agent_mode,
&provider_id,
&model_id,
);
(StatusCode::OK, Json(assistant_message)).into_response()
}

View file

@ -30,6 +30,7 @@ use sandbox_agent_universal_agent_schema::{
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader};
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use tokio::time::sleep;
use tokio_stream::wrappers::BroadcastStream;
@ -937,6 +938,29 @@ pub(crate) struct PendingQuestionInfo {
pub options: Vec<String>,
}
#[derive(Debug, Clone)]
pub(crate) struct CommandExecutionRequest {
pub command: String,
pub args: Vec<String>,
pub cwd: Option<PathBuf>,
pub env: Option<HashMap<String, String>>,
pub timeout_ms: Option<u64>,
pub stream_output: bool,
pub shell: bool,
}
#[derive(Debug, Clone)]
pub(crate) struct CommandExecutionResult {
pub stdout: String,
pub stderr: String,
pub exit_code: Option<i32>,
pub duration_ms: u128,
pub timed_out: bool,
pub item_id: String,
pub sequence: u64,
pub status: ItemStatus,
}
impl ManagedServer {
fn base_url(&self) -> Option<String> {
match &self.kind {
@ -1772,6 +1796,304 @@ impl SessionManager {
Ok(())
}
pub(crate) async fn execute_command(
self: &Arc<Self>,
session_id: String,
request: CommandExecutionRequest,
) -> Result<CommandExecutionResult, SandboxError> {
let _ = self.session_snapshot_for_message(&session_id).await?;
let start_time = Instant::now();
let CommandExecutionRequest {
command,
args,
cwd,
env,
timeout_ms,
stream_output,
shell,
} = request;
let started_item = UniversalItem {
item_id: String::new(),
native_item_id: None,
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content: Vec::new(),
status: ItemStatus::InProgress,
};
let started_conversion = EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData {
item: started_item,
}),
)
.synthetic();
let started_events = self
.record_conversions(&session_id, vec![started_conversion])
.await?;
let (item_id, sequence) = started_events
.iter()
.find_map(|event| match &event.data {
UniversalEventData::Item(ItemEventData { item }) => {
if event.event_type == UniversalEventType::ItemStarted {
Some((item.item_id.clone(), event.sequence))
} else {
None
}
}
_ => None,
})
.ok_or_else(|| SandboxError::InvalidRequest {
message: "command execution could not allocate item id".to_string(),
})?;
let (command, args) = if shell {
if cfg!(windows) {
(
"cmd".to_string(),
vec!["/C".to_string(), command],
)
} else {
(
"sh".to_string(),
vec!["-c".to_string(), command],
)
}
} else {
(command, args)
};
let mut cmd = tokio::process::Command::new(command.clone());
cmd.args(args.clone())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
if let Some(cwd) = cwd.clone() {
cmd.current_dir(cwd);
}
if let Some(env) = env.clone() {
cmd.envs(env);
}
let mut child = match cmd.spawn() {
Ok(child) => child,
Err(err) => {
let duration_ms = start_time.elapsed().as_millis();
let error_message = format!("failed to spawn command: {err}");
let details = json!({
"command": command.clone(),
"args": args.clone(),
"cwd": cwd,
"error": error_message,
"duration_ms": duration_ms,
"timed_out": false,
"timeout_ms": timeout_ms,
});
let completed_item = UniversalItem {
item_id: item_id.clone(),
native_item_id: None,
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content: vec![ContentPart::Text {
text: "Command spawn failed".to_string(),
}],
status: ItemStatus::Failed,
};
let completed = EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item: completed_item }),
)
.synthetic();
let _ = self.record_conversions(&session_id, vec![completed]).await;
self.record_error(
&session_id,
"command execution failed".to_string(),
Some("command_failed".to_string()),
Some(details),
)
.await;
return Err(SandboxError::InvalidRequest {
message: error_message,
});
}
};
let mut stdout = String::new();
let mut stderr = String::new();
let mut timed_out = false;
let (tx, mut rx) = mpsc::unbounded_channel::<(bool, String)>();
if let Some(out) = child.stdout.take() {
let tx_out = tx.clone();
tokio::spawn(async move {
let mut reader = TokioBufReader::new(out);
let mut buffer = Vec::new();
loop {
buffer.clear();
match reader.read_until(b'\n', &mut buffer).await {
Ok(0) | Err(_) => break,
Ok(_) => {
let chunk = String::from_utf8_lossy(&buffer).to_string();
let _ = tx_out.send((true, chunk));
}
}
}
});
}
if let Some(err) = child.stderr.take() {
let tx_err = tx.clone();
tokio::spawn(async move {
let mut reader = TokioBufReader::new(err);
let mut buffer = Vec::new();
loop {
buffer.clear();
match reader.read_until(b'\n', &mut buffer).await {
Ok(0) | Err(_) => break,
Ok(_) => {
let chunk = String::from_utf8_lossy(&buffer).to_string();
let _ = tx_err.send((false, chunk));
}
}
}
});
}
drop(tx);
let mut status: Option<std::process::ExitStatus> = None;
let mut wait_fut = Box::pin(child.wait());
let mut timeout_sleep = timeout_ms
.map(|ms| tokio::time::sleep(Duration::from_millis(ms)));
loop {
tokio::select! {
result = &mut wait_fut, if status.is_none() => {
status = Some(result.map_err(|err| SandboxError::InvalidRequest { message: err.to_string() })?);
break;
}
chunk = rx.recv() => {
if let Some((is_stdout, text)) = chunk {
if is_stdout {
stdout.push_str(&text);
} else {
stderr.push_str(&text);
}
if stream_output {
let delta = EventConversion::new(
UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData {
item_id: item_id.clone(),
native_item_id: None,
delta: text,
}),
)
.synthetic();
let _ = self.record_conversions(&session_id, vec![delta]).await;
}
} else if status.is_some() {
break;
}
}
_ = timeout_sleep.as_mut().unwrap(), if timeout_sleep.is_some() && status.is_none() => {
timed_out = true;
let _ = child.kill().await;
status = Some(wait_fut.await.map_err(|err| SandboxError::InvalidRequest { message: err.to_string() })?);
break;
}
}
}
while let Some((is_stdout, text)) = rx.recv().await {
if is_stdout {
stdout.push_str(&text);
} else {
stderr.push_str(&text);
}
if stream_output {
let delta = EventConversion::new(
UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData {
item_id: item_id.clone(),
native_item_id: None,
delta: text,
}),
)
.synthetic();
let _ = self.record_conversions(&session_id, vec![delta]).await;
}
}
let duration_ms = start_time.elapsed().as_millis();
let exit_code = status.and_then(|value| value.code());
let mut combined = stdout.clone();
if !stderr.is_empty() {
if !combined.is_empty() {
combined.push('\n');
}
combined.push_str(&stderr);
}
let failed = timed_out || exit_code.unwrap_or(0) != 0;
let status_value = if failed {
ItemStatus::Failed
} else {
ItemStatus::Completed
};
let mut content = Vec::new();
if !combined.is_empty() {
content.push(ContentPart::Text { text: combined });
}
let completed_item = UniversalItem {
item_id: item_id.clone(),
native_item_id: None,
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content,
status: status_value.clone(),
};
let completed = EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item: completed_item }),
)
.synthetic();
let _ = self.record_conversions(&session_id, vec![completed]).await;
if failed {
let details = json!({
"command": command,
"args": args,
"cwd": cwd,
"exit_code": exit_code,
"duration_ms": duration_ms,
"stdout": stdout,
"stderr": stderr,
"timed_out": timed_out,
"timeout_ms": timeout_ms,
});
self.record_error(
&session_id,
"command execution failed".to_string(),
Some("command_failed".to_string()),
Some(details),
)
.await;
}
Ok(CommandExecutionResult {
stdout,
stderr,
exit_code,
duration_ms,
timed_out,
item_id,
sequence,
status: status_value,
})
}
async fn emit_synthetic_assistant_start(&self, session_id: &str) -> Result<(), SandboxError> {
let conversion = {
let mut sessions = self.sessions.lock().await;

View file

@ -0,0 +1,117 @@
/**
* Tests for OpenCode-compatible command/shell execution endpoints.
*/
import { describe, it, expect, beforeAll, beforeEach, afterEach } from "vitest";
import { createOpencodeClient, type OpencodeClient } from "@opencode-ai/sdk";
import { spawnSandboxAgent, buildSandboxAgent, type SandboxAgentHandle } from "./helpers/spawn";
describe("OpenCode-compatible Command Execution", () => {
let handle: SandboxAgentHandle;
let client: OpencodeClient;
let sessionId: string;
beforeAll(async () => {
await buildSandboxAgent();
});
beforeEach(async () => {
handle = await spawnSandboxAgent({ opencodeCompat: true });
client = createOpencodeClient({
baseUrl: `${handle.baseUrl}/opencode`,
headers: { Authorization: `Bearer ${handle.token}` },
});
const session = await client.session.create();
sessionId = session.data?.id!;
expect(sessionId).toBeDefined();
});
afterEach(async () => {
await handle?.dispose();
});
it("session.command should return output and emit events", async () => {
const events: any[] = [];
const eventStream = await client.event.subscribe();
const waitForOutput = new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, 10000);
(async () => {
try {
for await (const event of (eventStream as any).stream) {
events.push(event);
if (event.type === "message.part.updated") {
const text = event?.properties?.part?.text ?? "";
if (text.includes("hello-command")) {
clearTimeout(timeout);
resolve();
break;
}
}
}
} catch {
// Stream ended
}
})();
});
const response = await client.session.command({
path: { id: sessionId },
body: {
agent: "build",
model: "mock",
command: "echo",
arguments: "hello-command",
},
});
expect(response.error).toBeUndefined();
const parts = (response.data as any)?.parts ?? [];
expect(parts.length).toBeGreaterThan(0);
expect(parts[0]?.text ?? "").toContain("hello-command");
await waitForOutput;
expect(events.length).toBeGreaterThan(0);
});
it("session.shell should emit output events", async () => {
const events: any[] = [];
const eventStream = await client.event.subscribe();
const waitForOutput = new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, 10000);
(async () => {
try {
for await (const event of (eventStream as any).stream) {
events.push(event);
if (event.type === "message.part.updated") {
const text = event?.properties?.part?.text ?? "";
if (text.includes("hello-shell")) {
clearTimeout(timeout);
resolve();
break;
}
}
}
} catch {
// Stream ended
}
})();
});
const response = await client.session.shell({
path: { id: sessionId },
body: {
agent: "build",
model: { providerID: "sandbox-agent", modelID: "mock" },
command: "echo hello-shell",
},
});
expect(response.error).toBeUndefined();
await waitForOutput;
expect(events.length).toBeGreaterThan(0);
});
});

1
target Symbolic link
View file

@ -0,0 +1 @@
/home/nathan/sandbox-agent/target