diff --git a/.turbo b/.turbo new file mode 120000 index 0000000..0b7d9ca --- /dev/null +++ b/.turbo @@ -0,0 +1 @@ +/home/nathan/sandbox-agent/.turbo \ No newline at end of file diff --git a/dist b/dist new file mode 120000 index 0000000..f02d77f --- /dev/null +++ b/dist @@ -0,0 +1 @@ +/home/nathan/sandbox-agent/dist \ No newline at end of file diff --git a/node_modules b/node_modules new file mode 120000 index 0000000..501480b --- /dev/null +++ b/node_modules @@ -0,0 +1 @@ +/home/nathan/sandbox-agent/node_modules \ No newline at end of file diff --git a/server/packages/sandbox-agent/src/opencode_compat.rs b/server/packages/sandbox-agent/src/opencode_compat.rs index 55b7050..a8ddeda 100644 --- a/server/packages/sandbox-agent/src/opencode_compat.rs +++ b/server/packages/sandbox-agent/src/opencode_compat.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; use std::convert::Infallible; +use std::path::PathBuf; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::str::FromStr; @@ -23,7 +24,7 @@ use tokio::sync::{broadcast, Mutex}; use tokio::time::interval; use utoipa::{IntoParams, OpenApi, ToSchema}; -use crate::router::{AppState, CreateSessionRequest, PermissionReply}; +use crate::router::{AppState, CommandExecutionRequest, CreateSessionRequest, PermissionReply}; use sandbox_agent_error::SandboxError; use sandbox_agent_agent_management::agents::AgentId; use sandbox_agent_universal_agent_schema::{ @@ -942,6 +943,25 @@ fn normalize_part(session_id: &str, message_id: &str, input: &Value) -> Value { } } +fn split_arguments(arguments: &str) -> Vec { + arguments + .split_whitespace() + .filter(|value| !value.is_empty()) + .map(|value| value.to_string()) + .collect() +} + +fn combine_output(stdout: &str, stderr: &str) -> String { + let mut combined = stdout.to_string(); + if !stderr.is_empty() { + if !combined.is_empty() { + combined.push('\n'); + } + combined.push_str(stderr); + } + combined +} + fn message_id_for_sequence(sequence: u64) -> String { format!("msg_{:020}", sequence) } @@ -2234,7 +2254,23 @@ async fn oc_agent_list(State(state): State>) -> impl IntoR tag = "opencode" )] async fn oc_command_list() -> impl IntoResponse { - (StatusCode::OK, Json(json!([]))) + let commands = vec![ + json!({ + "name": "command", + "description": "Run a command with arguments", + "source": "command", + "template": "{{command}} {{arguments}}", + "hints": ["command", "arguments"], + }), + json!({ + "name": "shell", + "description": "Run a shell command", + "source": "command", + "template": "{{command}}", + "hints": ["command"], + }), + ]; + (StatusCode::OK, Json(json!(commands))) } #[utoipa::path( @@ -3162,27 +3198,111 @@ async fn oc_session_command( return bad_request("command and arguments are required").into_response(); } let directory = state.opencode.directory_for(&headers, query.directory.as_ref()); + let _ = state + .opencode + .ensure_session(&session_id, directory.clone()) + .await; let worktree = state.opencode.worktree_for(&directory); let now = state.opencode.now_ms(); - let assistant_message_id = next_id("msg_", &MESSAGE_COUNTER); - let agent = normalize_agent_mode(body.agent.clone()); + let agent_mode = normalize_agent_mode(body.agent.clone()); + let requested_model = body.model.as_deref(); + let (session_agent, provider_id, model_id) = resolve_session_agent( + &state, + &session_id, + Some(OPENCODE_PROVIDER_ID), + requested_model, + ) + .await; + + state.opencode.emit_event(json!({ + "type": "session.status", + "properties": { + "sessionID": session_id, + "status": {"type": "busy"} + } + })); + + let _ = state + .opencode + .update_runtime(&session_id, |runtime| { + runtime.last_agent = Some(agent_mode.clone()); + runtime.last_model_provider = Some(provider_id.clone()); + runtime.last_model_id = Some(model_id.clone()); + }) + .await; + + if let Err(err) = ensure_backing_session(&state, &session_id, &session_agent).await { + return sandbox_error_response(err).into_response(); + } + ensure_session_stream(state.clone(), session_id.clone()).await; + + let command = body.command.unwrap_or_default(); + let args = split_arguments(body.arguments.as_deref().unwrap_or("")); + let result = match state + .inner + .session_manager() + .execute_command( + session_id.clone(), + CommandExecutionRequest { + command, + args, + cwd: Some(PathBuf::from(directory.clone())), + env: None, + timeout_ms: None, + stream_output: true, + shell: false, + }, + ) + .await + { + Ok(result) => result, + Err(err) => return sandbox_error_response(err).into_response(), + }; + + let mut parent_id: Option = None; + let runtime = state + .opencode + .update_runtime(&session_id, |runtime| { + parent_id = runtime.last_user_message_id.clone(); + }) + .await; + let message_id = runtime + .message_id_for_item + .get(&result.item_id) + .cloned() + .unwrap_or_else(|| unique_assistant_message_id(&runtime, parent_id.as_ref(), result.sequence)); + let _ = state + .opencode + .update_runtime(&session_id, |runtime| { + runtime + .message_id_for_item + .insert(result.item_id.clone(), message_id.clone()); + }) + .await; + let assistant_message = build_assistant_message( &session_id, - &assistant_message_id, - "msg_parent", + &message_id, + parent_id.as_deref().unwrap_or(""), now, &directory, &worktree, - &agent, - OPENCODE_PROVIDER_ID, - OPENCODE_DEFAULT_MODEL_ID, + &agent_mode, + &provider_id, + &model_id, ); + let output_text = combine_output(&result.stdout, &result.stderr); + let parts = if output_text.is_empty() { + Vec::new() + } else { + vec![build_text_part(&session_id, &message_id, &output_text)] + }; ( StatusCode::OK, Json(json!({ "info": assistant_message, - "parts": [], + "parts": parts, })), ) .into_response() @@ -3207,27 +3327,101 @@ async fn oc_session_shell( return bad_request("agent and command are required").into_response(); } let directory = state.opencode.directory_for(&headers, query.directory.as_ref()); + let _ = state + .opencode + .ensure_session(&session_id, directory.clone()) + .await; let worktree = state.opencode.worktree_for(&directory); let now = state.opencode.now_ms(); - let assistant_message_id = next_id("msg_", &MESSAGE_COUNTER); + let agent_mode = normalize_agent_mode(body.agent.clone()); + let requested_provider = body + .model + .as_ref() + .and_then(|v| v.get("providerID")) + .and_then(|v| v.as_str()); + let requested_model = body + .model + .as_ref() + .and_then(|v| v.get("modelID")) + .and_then(|v| v.as_str()); + let (session_agent, provider_id, model_id) = + resolve_session_agent(&state, &session_id, requested_provider, requested_model).await; + + state.opencode.emit_event(json!({ + "type": "session.status", + "properties": { + "sessionID": session_id, + "status": {"type": "busy"} + } + })); + + let _ = state + .opencode + .update_runtime(&session_id, |runtime| { + runtime.last_agent = Some(agent_mode.clone()); + runtime.last_model_provider = Some(provider_id.clone()); + runtime.last_model_id = Some(model_id.clone()); + }) + .await; + + if let Err(err) = ensure_backing_session(&state, &session_id, &session_agent).await { + return sandbox_error_response(err).into_response(); + } + ensure_session_stream(state.clone(), session_id.clone()).await; + + let command = body.command.unwrap_or_default(); + let result = match state + .inner + .session_manager() + .execute_command( + session_id.clone(), + CommandExecutionRequest { + command, + args: Vec::new(), + cwd: Some(PathBuf::from(directory.clone())), + env: None, + timeout_ms: None, + stream_output: true, + shell: true, + }, + ) + .await + { + Ok(result) => result, + Err(err) => return sandbox_error_response(err).into_response(), + }; + + let mut parent_id: Option = None; + let runtime = state + .opencode + .update_runtime(&session_id, |runtime| { + parent_id = runtime.last_user_message_id.clone(); + }) + .await; + let message_id = runtime + .message_id_for_item + .get(&result.item_id) + .cloned() + .unwrap_or_else(|| unique_assistant_message_id(&runtime, parent_id.as_ref(), result.sequence)); + let _ = state + .opencode + .update_runtime(&session_id, |runtime| { + runtime + .message_id_for_item + .insert(result.item_id.clone(), message_id.clone()); + }) + .await; + let assistant_message = build_assistant_message( &session_id, - &assistant_message_id, - "msg_parent", + &message_id, + parent_id.as_deref().unwrap_or(""), now, &directory, &worktree, - &normalize_agent_mode(body.agent.clone()), - body.model - .as_ref() - .and_then(|v| v.get("providerID")) - .and_then(|v| v.as_str()) - .unwrap_or(OPENCODE_PROVIDER_ID), - body.model - .as_ref() - .and_then(|v| v.get("modelID")) - .and_then(|v| v.as_str()) - .unwrap_or(OPENCODE_DEFAULT_MODEL_ID), + &agent_mode, + &provider_id, + &model_id, ); (StatusCode::OK, Json(assistant_message)).into_response() } diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index 3ca437a..aa2367a 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -30,6 +30,7 @@ use sandbox_agent_universal_agent_schema::{ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; +use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader}; use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use tokio::time::sleep; use tokio_stream::wrappers::BroadcastStream; @@ -937,6 +938,29 @@ pub(crate) struct PendingQuestionInfo { pub options: Vec, } +#[derive(Debug, Clone)] +pub(crate) struct CommandExecutionRequest { + pub command: String, + pub args: Vec, + pub cwd: Option, + pub env: Option>, + pub timeout_ms: Option, + pub stream_output: bool, + pub shell: bool, +} + +#[derive(Debug, Clone)] +pub(crate) struct CommandExecutionResult { + pub stdout: String, + pub stderr: String, + pub exit_code: Option, + pub duration_ms: u128, + pub timed_out: bool, + pub item_id: String, + pub sequence: u64, + pub status: ItemStatus, +} + impl ManagedServer { fn base_url(&self) -> Option { match &self.kind { @@ -1772,6 +1796,304 @@ impl SessionManager { Ok(()) } + pub(crate) async fn execute_command( + self: &Arc, + session_id: String, + request: CommandExecutionRequest, + ) -> Result { + let _ = self.session_snapshot_for_message(&session_id).await?; + let start_time = Instant::now(); + + let CommandExecutionRequest { + command, + args, + cwd, + env, + timeout_ms, + stream_output, + shell, + } = request; + + let started_item = UniversalItem { + item_id: String::new(), + native_item_id: None, + parent_id: None, + kind: ItemKind::Message, + role: Some(ItemRole::Assistant), + content: Vec::new(), + status: ItemStatus::InProgress, + }; + let started_conversion = EventConversion::new( + UniversalEventType::ItemStarted, + UniversalEventData::Item(ItemEventData { + item: started_item, + }), + ) + .synthetic(); + + let started_events = self + .record_conversions(&session_id, vec![started_conversion]) + .await?; + let (item_id, sequence) = started_events + .iter() + .find_map(|event| match &event.data { + UniversalEventData::Item(ItemEventData { item }) => { + if event.event_type == UniversalEventType::ItemStarted { + Some((item.item_id.clone(), event.sequence)) + } else { + None + } + } + _ => None, + }) + .ok_or_else(|| SandboxError::InvalidRequest { + message: "command execution could not allocate item id".to_string(), + })?; + + let (command, args) = if shell { + if cfg!(windows) { + ( + "cmd".to_string(), + vec!["/C".to_string(), command], + ) + } else { + ( + "sh".to_string(), + vec!["-c".to_string(), command], + ) + } + } else { + (command, args) + }; + + let mut cmd = tokio::process::Command::new(command.clone()); + cmd.args(args.clone()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + if let Some(cwd) = cwd.clone() { + cmd.current_dir(cwd); + } + if let Some(env) = env.clone() { + cmd.envs(env); + } + + let mut child = match cmd.spawn() { + Ok(child) => child, + Err(err) => { + let duration_ms = start_time.elapsed().as_millis(); + let error_message = format!("failed to spawn command: {err}"); + let details = json!({ + "command": command.clone(), + "args": args.clone(), + "cwd": cwd, + "error": error_message, + "duration_ms": duration_ms, + "timed_out": false, + "timeout_ms": timeout_ms, + }); + let completed_item = UniversalItem { + item_id: item_id.clone(), + native_item_id: None, + parent_id: None, + kind: ItemKind::Message, + role: Some(ItemRole::Assistant), + content: vec![ContentPart::Text { + text: "Command spawn failed".to_string(), + }], + status: ItemStatus::Failed, + }; + let completed = EventConversion::new( + UniversalEventType::ItemCompleted, + UniversalEventData::Item(ItemEventData { item: completed_item }), + ) + .synthetic(); + let _ = self.record_conversions(&session_id, vec![completed]).await; + self.record_error( + &session_id, + "command execution failed".to_string(), + Some("command_failed".to_string()), + Some(details), + ) + .await; + return Err(SandboxError::InvalidRequest { + message: error_message, + }); + } + }; + + let mut stdout = String::new(); + let mut stderr = String::new(); + let mut timed_out = false; + + let (tx, mut rx) = mpsc::unbounded_channel::<(bool, String)>(); + if let Some(out) = child.stdout.take() { + let tx_out = tx.clone(); + tokio::spawn(async move { + let mut reader = TokioBufReader::new(out); + let mut buffer = Vec::new(); + loop { + buffer.clear(); + match reader.read_until(b'\n', &mut buffer).await { + Ok(0) | Err(_) => break, + Ok(_) => { + let chunk = String::from_utf8_lossy(&buffer).to_string(); + let _ = tx_out.send((true, chunk)); + } + } + } + }); + } + if let Some(err) = child.stderr.take() { + let tx_err = tx.clone(); + tokio::spawn(async move { + let mut reader = TokioBufReader::new(err); + let mut buffer = Vec::new(); + loop { + buffer.clear(); + match reader.read_until(b'\n', &mut buffer).await { + Ok(0) | Err(_) => break, + Ok(_) => { + let chunk = String::from_utf8_lossy(&buffer).to_string(); + let _ = tx_err.send((false, chunk)); + } + } + } + }); + } + drop(tx); + + let mut status: Option = None; + let mut wait_fut = Box::pin(child.wait()); + let mut timeout_sleep = timeout_ms + .map(|ms| tokio::time::sleep(Duration::from_millis(ms))); + + loop { + tokio::select! { + result = &mut wait_fut, if status.is_none() => { + status = Some(result.map_err(|err| SandboxError::InvalidRequest { message: err.to_string() })?); + break; + } + chunk = rx.recv() => { + if let Some((is_stdout, text)) = chunk { + if is_stdout { + stdout.push_str(&text); + } else { + stderr.push_str(&text); + } + if stream_output { + let delta = EventConversion::new( + UniversalEventType::ItemDelta, + UniversalEventData::ItemDelta(ItemDeltaData { + item_id: item_id.clone(), + native_item_id: None, + delta: text, + }), + ) + .synthetic(); + let _ = self.record_conversions(&session_id, vec![delta]).await; + } + } else if status.is_some() { + break; + } + } + _ = timeout_sleep.as_mut().unwrap(), if timeout_sleep.is_some() && status.is_none() => { + timed_out = true; + let _ = child.kill().await; + status = Some(wait_fut.await.map_err(|err| SandboxError::InvalidRequest { message: err.to_string() })?); + break; + } + } + } + + while let Some((is_stdout, text)) = rx.recv().await { + if is_stdout { + stdout.push_str(&text); + } else { + stderr.push_str(&text); + } + if stream_output { + let delta = EventConversion::new( + UniversalEventType::ItemDelta, + UniversalEventData::ItemDelta(ItemDeltaData { + item_id: item_id.clone(), + native_item_id: None, + delta: text, + }), + ) + .synthetic(); + let _ = self.record_conversions(&session_id, vec![delta]).await; + } + } + + let duration_ms = start_time.elapsed().as_millis(); + let exit_code = status.and_then(|value| value.code()); + let mut combined = stdout.clone(); + if !stderr.is_empty() { + if !combined.is_empty() { + combined.push('\n'); + } + combined.push_str(&stderr); + } + let failed = timed_out || exit_code.unwrap_or(0) != 0; + let status_value = if failed { + ItemStatus::Failed + } else { + ItemStatus::Completed + }; + + let mut content = Vec::new(); + if !combined.is_empty() { + content.push(ContentPart::Text { text: combined }); + } + let completed_item = UniversalItem { + item_id: item_id.clone(), + native_item_id: None, + parent_id: None, + kind: ItemKind::Message, + role: Some(ItemRole::Assistant), + content, + status: status_value.clone(), + }; + let completed = EventConversion::new( + UniversalEventType::ItemCompleted, + UniversalEventData::Item(ItemEventData { item: completed_item }), + ) + .synthetic(); + let _ = self.record_conversions(&session_id, vec![completed]).await; + + if failed { + let details = json!({ + "command": command, + "args": args, + "cwd": cwd, + "exit_code": exit_code, + "duration_ms": duration_ms, + "stdout": stdout, + "stderr": stderr, + "timed_out": timed_out, + "timeout_ms": timeout_ms, + }); + self.record_error( + &session_id, + "command execution failed".to_string(), + Some("command_failed".to_string()), + Some(details), + ) + .await; + } + + Ok(CommandExecutionResult { + stdout, + stderr, + exit_code, + duration_ms, + timed_out, + item_id, + sequence, + status: status_value, + }) + } + async fn emit_synthetic_assistant_start(&self, session_id: &str) -> Result<(), SandboxError> { let conversion = { let mut sessions = self.sessions.lock().await; diff --git a/server/packages/sandbox-agent/tests/opencode-compat/command.test.ts b/server/packages/sandbox-agent/tests/opencode-compat/command.test.ts new file mode 100644 index 0000000..b3a0e28 --- /dev/null +++ b/server/packages/sandbox-agent/tests/opencode-compat/command.test.ts @@ -0,0 +1,117 @@ +/** + * Tests for OpenCode-compatible command/shell execution endpoints. + */ + +import { describe, it, expect, beforeAll, beforeEach, afterEach } from "vitest"; +import { createOpencodeClient, type OpencodeClient } from "@opencode-ai/sdk"; +import { spawnSandboxAgent, buildSandboxAgent, type SandboxAgentHandle } from "./helpers/spawn"; + +describe("OpenCode-compatible Command Execution", () => { + let handle: SandboxAgentHandle; + let client: OpencodeClient; + let sessionId: string; + + beforeAll(async () => { + await buildSandboxAgent(); + }); + + beforeEach(async () => { + handle = await spawnSandboxAgent({ opencodeCompat: true }); + client = createOpencodeClient({ + baseUrl: `${handle.baseUrl}/opencode`, + headers: { Authorization: `Bearer ${handle.token}` }, + }); + + const session = await client.session.create(); + sessionId = session.data?.id!; + expect(sessionId).toBeDefined(); + }); + + afterEach(async () => { + await handle?.dispose(); + }); + + it("session.command should return output and emit events", async () => { + const events: any[] = []; + const eventStream = await client.event.subscribe(); + + const waitForOutput = new Promise((resolve) => { + const timeout = setTimeout(resolve, 10000); + (async () => { + try { + for await (const event of (eventStream as any).stream) { + events.push(event); + if (event.type === "message.part.updated") { + const text = event?.properties?.part?.text ?? ""; + if (text.includes("hello-command")) { + clearTimeout(timeout); + resolve(); + break; + } + } + } + } catch { + // Stream ended + } + })(); + }); + + const response = await client.session.command({ + path: { id: sessionId }, + body: { + agent: "build", + model: "mock", + command: "echo", + arguments: "hello-command", + }, + }); + + expect(response.error).toBeUndefined(); + const parts = (response.data as any)?.parts ?? []; + expect(parts.length).toBeGreaterThan(0); + expect(parts[0]?.text ?? "").toContain("hello-command"); + + await waitForOutput; + expect(events.length).toBeGreaterThan(0); + }); + + it("session.shell should emit output events", async () => { + const events: any[] = []; + const eventStream = await client.event.subscribe(); + + const waitForOutput = new Promise((resolve) => { + const timeout = setTimeout(resolve, 10000); + (async () => { + try { + for await (const event of (eventStream as any).stream) { + events.push(event); + if (event.type === "message.part.updated") { + const text = event?.properties?.part?.text ?? ""; + if (text.includes("hello-shell")) { + clearTimeout(timeout); + resolve(); + break; + } + } + } + } catch { + // Stream ended + } + })(); + }); + + const response = await client.session.shell({ + path: { id: sessionId }, + body: { + agent: "build", + model: { providerID: "sandbox-agent", modelID: "mock" }, + command: "echo hello-shell", + }, + }); + + expect(response.error).toBeUndefined(); + + await waitForOutput; + expect(events.length).toBeGreaterThan(0); + }); +}); diff --git a/target b/target new file mode 120000 index 0000000..3d6ad8c --- /dev/null +++ b/target @@ -0,0 +1 @@ +/home/nathan/sandbox-agent/target \ No newline at end of file