diff --git a/engine/packages/agent-management/src/agents.rs b/engine/packages/agent-management/src/agents.rs new file mode 100644 index 0000000..9c68aba --- /dev/null +++ b/engine/packages/agent-management/src/agents.rs @@ -0,0 +1,1001 @@ +use std::collections::HashMap; +use std::fmt; +use std::fs; +use std::io::{self, Read}; +use std::path::{Path, PathBuf}; +use std::process::{Child, ChildStderr, ChildStdout, Command, ExitStatus, Stdio}; + +use flate2::read::GzDecoder; +use reqwest::blocking::Client; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use thiserror::Error; +use url::Url; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum AgentId { + Claude, + Codex, + Opencode, + Amp, +} + +impl AgentId { + pub fn as_str(self) -> &'static str { + match self { + AgentId::Claude => "claude", + AgentId::Codex => "codex", + AgentId::Opencode => "opencode", + AgentId::Amp => "amp", + } + } + + pub fn binary_name(self) -> &'static str { + match self { + AgentId::Claude => "claude", + AgentId::Codex => "codex", + AgentId::Opencode => "opencode", + AgentId::Amp => "amp", + } + } + + pub fn parse(value: &str) -> Option { + match value { + "claude" => Some(AgentId::Claude), + "codex" => Some(AgentId::Codex), + "opencode" => Some(AgentId::Opencode), + "amp" => Some(AgentId::Amp), + _ => None, + } + } +} + +impl fmt::Display for AgentId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Platform { + LinuxX64, + LinuxX64Musl, + LinuxArm64, + MacosArm64, + MacosX64, +} + +impl Platform { + pub fn detect() -> Result { + let os = std::env::consts::OS; + let arch = std::env::consts::ARCH; + let is_musl = cfg!(target_env = "musl"); + + match (os, arch, is_musl) { + ("linux", "x86_64", true) => Ok(Self::LinuxX64Musl), + ("linux", "x86_64", false) => Ok(Self::LinuxX64), + ("linux", "aarch64", _) => Ok(Self::LinuxArm64), + ("macos", "aarch64", _) => Ok(Self::MacosArm64), + ("macos", "x86_64", _) => Ok(Self::MacosX64), + _ => Err(AgentError::UnsupportedPlatform { + os: os.to_string(), + arch: arch.to_string(), + }), + } + } +} + +#[derive(Debug, Clone)] +pub struct AgentManager { + install_dir: PathBuf, + platform: Platform, +} + +impl AgentManager { + pub fn new(install_dir: impl Into) -> Result { + Ok(Self { + install_dir: install_dir.into(), + platform: Platform::detect()?, + }) + } + + pub fn with_platform( + install_dir: impl Into, + platform: Platform, + ) -> Self { + Self { + install_dir: install_dir.into(), + platform, + } + } + + pub fn install(&self, agent: AgentId, options: InstallOptions) -> Result { + let install_path = self.binary_path(agent); + if install_path.exists() && !options.reinstall { + return Ok(InstallResult { + path: install_path, + version: self.version(agent).unwrap_or(None), + }); + } + + fs::create_dir_all(&self.install_dir)?; + + match agent { + AgentId::Claude => install_claude(&install_path, self.platform, options.version.as_deref())?, + AgentId::Codex => install_codex(&install_path, self.platform, options.version.as_deref())?, + AgentId::Opencode => install_opencode(&install_path, self.platform, options.version.as_deref())?, + AgentId::Amp => install_amp(&install_path, self.platform, options.version.as_deref())?, + } + + Ok(InstallResult { + path: install_path, + version: self.version(agent).unwrap_or(None), + }) + } + + pub fn is_installed(&self, agent: AgentId) -> bool { + self.binary_path(agent).exists() || find_in_path(agent.binary_name()).is_some() + } + + pub fn binary_path(&self, agent: AgentId) -> PathBuf { + self.install_dir.join(agent.binary_name()) + } + + pub fn version(&self, agent: AgentId) -> Result, AgentError> { + let path = self.resolve_binary(agent)?; + let attempts = [vec!["--version"], vec!["version"], vec!["-V"]]; + for args in attempts { + let output = Command::new(&path).args(args).output(); + if let Ok(output) = output { + if output.status.success() { + if let Some(version) = parse_version_output(&output) { + return Ok(Some(version)); + } + } + } + } + Ok(None) + } + + pub fn spawn(&self, agent: AgentId, options: SpawnOptions) -> Result { + let path = self.resolve_binary(agent)?; + let working_dir = options + .working_dir + .clone() + .unwrap_or_else(|| std::env::current_dir().unwrap_or_default()); + let mut command = Command::new(&path); + command.current_dir(&working_dir); + + match agent { + AgentId::Claude => { + command + .arg("--print") + .arg("--output-format") + .arg("stream-json") + .arg("--verbose"); + if let Some(model) = options.model.as_deref() { + command.arg("--model").arg(model); + } + if let Some(session_id) = options.session_id.as_deref() { + command.arg("--resume").arg(session_id); + } + match options.permission_mode.as_deref() { + Some("plan") => { + command.arg("--permission-mode").arg("plan"); + } + Some("bypass") => { + command.arg("--dangerously-skip-permissions"); + } + _ => {} + } + command.arg(&options.prompt); + } + AgentId::Codex => { + if options.session_id.is_some() { + return Err(AgentError::ResumeUnsupported { agent }); + } + command + .arg("exec") + .arg("--json"); + match options.permission_mode.as_deref() { + Some("plan") => { + command.arg("--sandbox").arg("read-only"); + } + Some("bypass") => { + command.arg("--dangerously-bypass-approvals-and-sandbox"); + } + _ => {} + } + if let Some(model) = options.model.as_deref() { + command.arg("-m").arg(model); + } + let prompt = codex_prompt_for_mode(&options.prompt, options.agent_mode.as_deref()); + command.arg(prompt); + } + AgentId::Opencode => { + command + .arg("run") + .arg("--format") + .arg("json"); + if let Some(model) = options.model.as_deref() { + command.arg("-m").arg(model); + } + if let Some(agent_mode) = options.agent_mode.as_deref() { + command.arg("--agent").arg(agent_mode); + } + if let Some(variant) = options.variant.as_deref() { + command.arg("--variant").arg(variant); + } + if let Some(session_id) = options.session_id.as_deref() { + command.arg("-s").arg(session_id); + } + command.arg(&options.prompt); + } + AgentId::Amp => { + let output = spawn_amp(&path, &working_dir, &options)?; + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let events = parse_jsonl_from_outputs(&stdout, &stderr); + return Ok(SpawnResult { + status: output.status, + stdout, + stderr, + session_id: extract_session_id(agent, &events), + result: extract_result_text(agent, &events), + events, + }); + } + } + + for (key, value) in options.env { + command.env(key, value); + } + + let output = command.output().map_err(AgentError::Io)?; + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let events = parse_jsonl_from_outputs(&stdout, &stderr); + Ok(SpawnResult { + status: output.status, + stdout, + stderr, + session_id: extract_session_id(agent, &events), + result: extract_result_text(agent, &events), + events, + }) + } + + pub fn spawn_streaming( + &self, + agent: AgentId, + options: SpawnOptions, + ) -> Result { + let mut command = self.build_command(agent, &options)?; + command.stdout(Stdio::piped()).stderr(Stdio::piped()); + let mut child = command.spawn().map_err(AgentError::Io)?; + let stdout = child.stdout.take(); + let stderr = child.stderr.take(); + Ok(StreamingSpawn { child, stdout, stderr }) + } + + fn build_command(&self, agent: AgentId, options: &SpawnOptions) -> Result { + let path = self.resolve_binary(agent)?; + let working_dir = options + .working_dir + .clone() + .unwrap_or_else(|| std::env::current_dir().unwrap_or_default()); + let mut command = Command::new(&path); + command.current_dir(&working_dir); + + match agent { + AgentId::Claude => { + command + .arg("--print") + .arg("--output-format") + .arg("stream-json") + .arg("--verbose"); + if let Some(model) = options.model.as_deref() { + command.arg("--model").arg(model); + } + if let Some(session_id) = options.session_id.as_deref() { + command.arg("--resume").arg(session_id); + } + match options.permission_mode.as_deref() { + Some("plan") => { + command.arg("--permission-mode").arg("plan"); + } + Some("bypass") => { + command.arg("--dangerously-skip-permissions"); + } + _ => {} + } + command.arg(&options.prompt); + } + AgentId::Codex => { + if options.session_id.is_some() { + return Err(AgentError::ResumeUnsupported { agent }); + } + command.arg("exec").arg("--json"); + match options.permission_mode.as_deref() { + Some("plan") => { + command.arg("--sandbox").arg("read-only"); + } + Some("bypass") => { + command.arg("--dangerously-bypass-approvals-and-sandbox"); + } + _ => {} + } + if let Some(model) = options.model.as_deref() { + command.arg("-m").arg(model); + } + let prompt = codex_prompt_for_mode(&options.prompt, options.agent_mode.as_deref()); + command.arg(prompt); + } + AgentId::Opencode => { + command.arg("run").arg("--format").arg("json"); + if let Some(model) = options.model.as_deref() { + command.arg("-m").arg(model); + } + if let Some(agent_mode) = options.agent_mode.as_deref() { + command.arg("--agent").arg(agent_mode); + } + if let Some(variant) = options.variant.as_deref() { + command.arg("--variant").arg(variant); + } + if let Some(session_id) = options.session_id.as_deref() { + command.arg("-s").arg(session_id); + } + command.arg(&options.prompt); + } + AgentId::Amp => { + return Ok(build_amp_command(&path, &working_dir, options)); + } + } + + for (key, value) in &options.env { + command.env(key, value); + } + + Ok(command) + } + + pub fn resolve_binary(&self, agent: AgentId) -> Result { + let path = self.binary_path(agent); + if path.exists() { + return Ok(path); + } + if let Some(path) = find_in_path(agent.binary_name()) { + return Ok(path); + } + Err(AgentError::BinaryNotFound { agent }) + } +} + +#[derive(Debug, Clone)] +pub struct InstallOptions { + pub reinstall: bool, + pub version: Option, +} + +impl Default for InstallOptions { + fn default() -> Self { + Self { + reinstall: false, + version: None, + } + } +} + +#[derive(Debug, Clone)] +pub struct InstallResult { + pub path: PathBuf, + pub version: Option, +} + +#[derive(Debug, Clone)] +pub struct SpawnOptions { + pub prompt: String, + pub model: Option, + pub variant: Option, + pub agent_mode: Option, + pub permission_mode: Option, + pub session_id: Option, + pub working_dir: Option, + pub env: HashMap, +} + +impl SpawnOptions { + pub fn new(prompt: impl Into) -> Self { + Self { + prompt: prompt.into(), + model: None, + variant: None, + agent_mode: None, + permission_mode: None, + session_id: None, + working_dir: None, + env: HashMap::new(), + } + } +} + +#[derive(Debug, Clone)] +pub struct SpawnResult { + pub status: ExitStatus, + pub stdout: String, + pub stderr: String, + pub events: Vec, + pub session_id: Option, + pub result: Option, +} + +#[derive(Debug)] +pub struct StreamingSpawn { + pub child: Child, + pub stdout: Option, + pub stderr: Option, +} + +#[derive(Debug, Error)] +pub enum AgentError { + #[error("unsupported platform {os}/{arch}")] + UnsupportedPlatform { os: String, arch: String }, + #[error("unsupported agent {agent}")] + UnsupportedAgent { agent: String }, + #[error("binary not found for {agent}")] + BinaryNotFound { agent: AgentId }, + #[error("download failed: {url}")] + DownloadFailed { url: Url }, + #[error("http error: {0}")] + Http(#[from] reqwest::Error), + #[error("url parse error: {0}")] + UrlParse(#[from] url::ParseError), + #[error("io error: {0}")] + Io(#[from] io::Error), + #[error("extract failed: {0}")] + ExtractFailed(String), + #[error("resume unsupported for {agent}")] + ResumeUnsupported { agent: AgentId }, +} + +fn parse_version_output(output: &std::process::Output) -> Option { + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + let combined = format!("{}\n{}", stdout, stderr); + combined + .lines() + .map(str::trim) + .find(|line| !line.is_empty()) + .map(|line| line.to_string()) +} + +fn parse_jsonl(text: &str) -> Vec { + text.lines() + .filter_map(|line| serde_json::from_str::(line).ok()) + .collect() +} + +fn parse_jsonl_from_outputs(stdout: &str, stderr: &str) -> Vec { + let mut events = parse_jsonl(stdout); + events.extend(parse_jsonl(stderr)); + events +} + +fn codex_prompt_for_mode(prompt: &str, mode: Option<&str>) -> String { + match mode { + Some("plan") => format!("Make a plan before acting.\n\n{prompt}"), + _ => prompt.to_string(), + } +} + +fn extract_nested_string(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + if let Ok(index) = key.parse::() { + current = current.get(index)?; + } else { + current = current.get(*key)?; + } + } + current.as_str().map(|s| s.to_string()) +} + +fn extract_session_id(agent: AgentId, events: &[Value]) -> Option { + for event in events { + match agent { + AgentId::Claude | AgentId::Amp => { + if let Some(id) = event.get("session_id").and_then(Value::as_str) { + return Some(id.to_string()); + } + } + AgentId::Codex => { + if event.get("type").and_then(Value::as_str) == Some("thread.started") { + if let Some(id) = event.get("thread_id").and_then(Value::as_str) { + return Some(id.to_string()); + } + } + if let Some(id) = event.get("thread_id").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = event.get("threadId").and_then(Value::as_str) { + return Some(id.to_string()); + } + } + AgentId::Opencode => { + if let Some(id) = event.get("session_id").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = event.get("sessionID").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = event.get("sessionId").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = extract_nested_string(event, &["properties", "sessionID"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(event, &["properties", "part", "sessionID"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(event, &["session", "id"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(event, &["properties", "session", "id"]) { + return Some(id); + } + } + } + } + None +} + +fn extract_result_text(agent: AgentId, events: &[Value]) -> Option { + match agent { + AgentId::Claude | AgentId::Amp => { + for event in events { + if let Some(result) = event.get("result").and_then(Value::as_str) { + return Some(result.to_string()); + } + if let Some(text) = extract_nested_string(event, &["message", "content", "0", "text"]) { + return Some(text); + } + } + None + } + AgentId::Codex => { + let mut last = None; + for event in events { + if event.get("type").and_then(Value::as_str) == Some("item.completed") { + if let Some(item) = event.get("item") { + if item.get("type").and_then(Value::as_str) == Some("agent_message") { + if let Some(text) = item.get("text").and_then(Value::as_str) { + last = Some(text.to_string()); + } + } + } + } + if let Some(result) = event.get("result").and_then(Value::as_str) { + last = Some(result.to_string()); + } + if let Some(output) = event.get("output").and_then(Value::as_str) { + last = Some(output.to_string()); + } + if let Some(message) = event.get("message").and_then(Value::as_str) { + last = Some(message.to_string()); + } + } + last + } + AgentId::Opencode => { + let mut buffer = String::new(); + for event in events { + if event.get("type").and_then(Value::as_str) == Some("message.part.updated") { + if let Some(delta) = extract_nested_string(event, &["properties", "delta"]) { + buffer.push_str(&delta); + } + if let Some(content) = extract_nested_string(event, &["properties", "part", "content"]) { + buffer.push_str(&content); + } + } + if let Some(result) = event.get("result").and_then(Value::as_str) { + if buffer.is_empty() { + buffer.push_str(result); + } + } + } + if buffer.is_empty() { + None + } else { + Some(buffer) + } + } + } +} + +fn spawn_amp( + path: &Path, + working_dir: &Path, + options: &SpawnOptions, +) -> Result { + let flags = detect_amp_flags(path, working_dir).unwrap_or_default(); + let mut args: Vec<&str> = Vec::new(); + if flags.execute { + args.push("--execute"); + } else if flags.print { + args.push("--print"); + } + if flags.output_format { + args.push("--output-format"); + args.push("stream-json"); + } + if flags.dangerously_skip_permissions && options.permission_mode.as_deref() == Some("bypass") { + args.push("--dangerously-skip-permissions"); + } + + let mut command = Command::new(path); + command.current_dir(working_dir); + if let Some(model) = options.model.as_deref() { + command.arg("--model").arg(model); + } + if let Some(session_id) = options.session_id.as_deref() { + command.arg("--continue").arg(session_id); + } + command.args(&args).arg(&options.prompt); + for (key, value) in &options.env { + command.env(key, value); + } + let output = command.output().map_err(AgentError::Io)?; + if output.status.success() { + return Ok(output); + } + + let stderr = String::from_utf8_lossy(&output.stderr); + if stderr.contains("unknown option") + || stderr.contains("unknown flag") + || stderr.contains("User message must be provided") + { + return spawn_amp_fallback(path, working_dir, options); + } + + Ok(output) +} + +fn build_amp_command(path: &Path, working_dir: &Path, options: &SpawnOptions) -> Command { + let flags = detect_amp_flags(path, working_dir).unwrap_or_default(); + let mut command = Command::new(path); + command.current_dir(working_dir); + if let Some(model) = options.model.as_deref() { + command.arg("--model").arg(model); + } + if let Some(session_id) = options.session_id.as_deref() { + command.arg("--continue").arg(session_id); + } + if flags.execute { + command.arg("--execute"); + } else if flags.print { + command.arg("--print"); + } + if flags.output_format { + command.arg("--output-format").arg("stream-json"); + } + if flags.dangerously_skip_permissions && options.permission_mode.as_deref() == Some("bypass") { + command.arg("--dangerously-skip-permissions"); + } + command.arg(&options.prompt); + for (key, value) in &options.env { + command.env(key, value); + } + command +} + +#[derive(Debug, Default, Clone, Copy)] +struct AmpFlags { + execute: bool, + print: bool, + output_format: bool, + dangerously_skip_permissions: bool, +} + +fn detect_amp_flags(path: &Path, working_dir: &Path) -> Option { + let output = Command::new(path) + .current_dir(working_dir) + .arg("--help") + .output() + .ok()?; + let text = format!( + "{}\n{}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + Some(AmpFlags { + execute: text.contains("--execute"), + print: text.contains("--print"), + output_format: text.contains("--output-format"), + dangerously_skip_permissions: text.contains("--dangerously-skip-permissions"), + }) +} + +fn spawn_amp_fallback( + path: &Path, + working_dir: &Path, + options: &SpawnOptions, +) -> Result { + let mut attempts = vec![ + vec!["--execute"], + vec!["--print", "--output-format", "stream-json"], + vec!["--output-format", "stream-json"], + vec!["--dangerously-skip-permissions"], + vec![], + ]; + if options.permission_mode.as_deref() != Some("bypass") { + attempts.retain(|args| !args.contains(&"--dangerously-skip-permissions")); + } + + for args in attempts { + let mut command = Command::new(path); + command.current_dir(working_dir); + if let Some(model) = options.model.as_deref() { + command.arg("--model").arg(model); + } + if let Some(session_id) = options.session_id.as_deref() { + command.arg("--continue").arg(session_id); + } + if !args.is_empty() { + command.args(&args); + } + command.arg(&options.prompt); + for (key, value) in &options.env { + command.env(key, value); + } + let output = command.output().map_err(AgentError::Io)?; + if output.status.success() { + return Ok(output); + } + } + + let mut command = Command::new(path); + command.current_dir(working_dir); + if let Some(model) = options.model.as_deref() { + command.arg("--model").arg(model); + } + if let Some(session_id) = options.session_id.as_deref() { + command.arg("--continue").arg(session_id); + } + command.arg(&options.prompt); + for (key, value) in &options.env { + command.env(key, value); + } + Ok(command.output().map_err(AgentError::Io)?) +} + +fn find_in_path(binary_name: &str) -> Option { + let path_var = std::env::var_os("PATH")?; + for path in std::env::split_paths(&path_var) { + let candidate = path.join(binary_name); + if candidate.exists() { + return Some(candidate); + } + } + None +} + +fn download_bytes(url: &Url) -> Result, AgentError> { + let client = Client::builder().build()?; + let mut response = client.get(url.clone()).send()?; + if !response.status().is_success() { + return Err(AgentError::DownloadFailed { url: url.clone() }); + } + let mut bytes = Vec::new(); + response.read_to_end(&mut bytes)?; + Ok(bytes) +} + +fn install_claude(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> { + let version = match version { + Some(version) => version.to_string(), + None => { + let url = Url::parse( + "https://storage.googleapis.com/claude-code-dist-86c565f3-f756-42ad-8dfa-d59b1c096819/claude-code-releases/latest", + )?; + let text = String::from_utf8(download_bytes(&url)?).map_err(|err| AgentError::ExtractFailed(err.to_string()))?; + text.trim().to_string() + } + }; + + let platform_segment = match platform { + Platform::LinuxX64 => "linux-x64", + Platform::LinuxX64Musl => "linux-x64-musl", + Platform::LinuxArm64 => "linux-arm64", + Platform::MacosArm64 => "darwin-arm64", + Platform::MacosX64 => "darwin-x64", + }; + + let url = Url::parse(&format!( + "https://storage.googleapis.com/claude-code-dist-86c565f3-f756-42ad-8dfa-d59b1c096819/claude-code-releases/{version}/{platform_segment}/claude" + ))?; + let bytes = download_bytes(&url)?; + write_executable(path, &bytes)?; + Ok(()) +} + +fn install_amp(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> { + let version = match version { + Some(version) => version.to_string(), + None => { + let url = Url::parse("https://storage.googleapis.com/amp-public-assets-prod-0/cli/cli-version.txt")?; + let text = String::from_utf8(download_bytes(&url)?).map_err(|err| AgentError::ExtractFailed(err.to_string()))?; + text.trim().to_string() + } + }; + + let platform_segment = match platform { + Platform::LinuxX64 | Platform::LinuxX64Musl => "linux-x64", + Platform::LinuxArm64 => "linux-arm64", + Platform::MacosArm64 => "darwin-arm64", + Platform::MacosX64 => "darwin-x64", + }; + + let url = Url::parse(&format!( + "https://storage.googleapis.com/amp-public-assets-prod-0/cli/{version}/amp-{platform_segment}" + ))?; + let bytes = download_bytes(&url)?; + write_executable(path, &bytes)?; + Ok(()) +} + +fn install_codex(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> { + let target = match platform { + Platform::LinuxX64 | Platform::LinuxX64Musl => "x86_64-unknown-linux-musl", + Platform::LinuxArm64 => "aarch64-unknown-linux-musl", + Platform::MacosArm64 => "aarch64-apple-darwin", + Platform::MacosX64 => "x86_64-apple-darwin", + }; + + let url = match version { + Some(version) => Url::parse(&format!( + "https://github.com/openai/codex/releases/download/{version}/codex-{target}.tar.gz" + ))?, + None => Url::parse(&format!( + "https://github.com/openai/codex/releases/latest/download/codex-{target}.tar.gz" + ))?, + }; + + let bytes = download_bytes(&url)?; + let temp_dir = tempfile::tempdir()?; + let cursor = io::Cursor::new(bytes); + let mut archive = tar::Archive::new(GzDecoder::new(cursor)); + archive.unpack(temp_dir.path())?; + + let expected = format!("codex-{target}"); + let binary = find_file_recursive(temp_dir.path(), &expected)? + .ok_or_else(|| AgentError::ExtractFailed(format!("missing {expected}")))?; + move_executable(&binary, path)?; + Ok(()) +} + +fn install_opencode(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> { + match platform { + Platform::MacosArm64 => { + let url = match version { + Some(version) => Url::parse(&format!( + "https://github.com/anomalyco/opencode/releases/download/{version}/opencode-darwin-arm64.zip" + ))?, + None => Url::parse( + "https://github.com/anomalyco/opencode/releases/latest/download/opencode-darwin-arm64.zip", + )?, + }; + install_zip_binary(path, &url, "opencode") + } + Platform::MacosX64 => { + let url = match version { + Some(version) => Url::parse(&format!( + "https://github.com/anomalyco/opencode/releases/download/{version}/opencode-darwin-x64.zip" + ))?, + None => Url::parse( + "https://github.com/anomalyco/opencode/releases/latest/download/opencode-darwin-x64.zip", + )?, + }; + install_zip_binary(path, &url, "opencode") + } + _ => { + let platform_segment = match platform { + Platform::LinuxX64 => "linux-x64", + Platform::LinuxX64Musl => "linux-x64-musl", + Platform::LinuxArm64 => "linux-arm64", + Platform::MacosArm64 | Platform::MacosX64 => unreachable!(), + }; + let url = match version { + Some(version) => Url::parse(&format!( + "https://github.com/anomalyco/opencode/releases/download/{version}/opencode-{platform_segment}.tar.gz" + ))?, + None => Url::parse(&format!( + "https://github.com/anomalyco/opencode/releases/latest/download/opencode-{platform_segment}.tar.gz" + ))?, + }; + + let bytes = download_bytes(&url)?; + let temp_dir = tempfile::tempdir()?; + let cursor = io::Cursor::new(bytes); + let mut archive = tar::Archive::new(GzDecoder::new(cursor)); + archive.unpack(temp_dir.path())?; + let binary = find_file_recursive(temp_dir.path(), "opencode")? + .ok_or_else(|| AgentError::ExtractFailed("missing opencode".to_string()))?; + move_executable(&binary, path)?; + Ok(()) + } + } +} + +fn install_zip_binary(path: &Path, url: &Url, binary_name: &str) -> Result<(), AgentError> { + let bytes = download_bytes(url)?; + let reader = io::Cursor::new(bytes); + let mut archive = zip::ZipArchive::new(reader).map_err(|err| AgentError::ExtractFailed(err.to_string()))?; + let temp_dir = tempfile::tempdir()?; + for i in 0..archive.len() { + let mut file = archive + .by_index(i) + .map_err(|err| AgentError::ExtractFailed(err.to_string()))?; + if !file.name().ends_with(binary_name) { + continue; + } + let out_path = temp_dir.path().join(binary_name); + let mut out_file = fs::File::create(&out_path)?; + io::copy(&mut file, &mut out_file)?; + move_executable(&out_path, path)?; + return Ok(()); + } + Err(AgentError::ExtractFailed(format!("missing {binary_name}"))) +} + +fn write_executable(path: &Path, bytes: &[u8]) -> Result<(), AgentError> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + fs::write(path, bytes)?; + set_executable(path)?; + Ok(()) +} + +fn move_executable(source: &Path, dest: &Path) -> Result<(), AgentError> { + if let Some(parent) = dest.parent() { + fs::create_dir_all(parent)?; + } + if dest.exists() { + fs::remove_file(dest)?; + } + fs::copy(source, dest)?; + set_executable(dest)?; + Ok(()) +} + +#[cfg(unix)] +fn set_executable(path: &Path) -> Result<(), AgentError> { + use std::os::unix::fs::PermissionsExt; + let mut perms = fs::metadata(path)?.permissions(); + perms.set_mode(0o755); + fs::set_permissions(path, perms)?; + Ok(()) +} + +#[cfg(not(unix))] +fn set_executable(_path: &Path) -> Result<(), AgentError> { + Ok(()) +} + +fn find_file_recursive(dir: &Path, filename: &str) -> Result, AgentError> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + if let Some(found) = find_file_recursive(&path, filename)? { + return Ok(Some(found)); + } + } else if let Some(name) = path.file_name().and_then(|s| s.to_str()) { + if name == filename { + return Ok(Some(path)); + } + } + } + Ok(None) +} diff --git a/engine/packages/sandbox-daemon/Cargo.toml b/engine/packages/sandbox-daemon/Cargo.toml index 4c68150..2af1f96 100644 --- a/engine/packages/sandbox-daemon/Cargo.toml +++ b/engine/packages/sandbox-daemon/Cargo.toml @@ -15,17 +15,17 @@ axum = "0.7" clap = { version = "4.5", features = ["derive"] } futures = "0.3" sandbox-daemon-error = { path = "../error" } -reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } -flate2 = "1.0" -tar = "0.4" -zip = { version = "0.6", default-features = false, features = ["deflate"] } -url = "2.5" +sandbox-daemon-agent-management = { path = "../agent-management" } +sandbox-daemon-agent-credentials = { path = "../agent-credentials" } +sandbox-daemon-universal-agent-schema = { path = "../universal-agent-schema" } +reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls", "stream"] } dirs = "5.0" -tempfile = "3.10" -time = { version = "0.3", features = ["parsing"] } -tokio = { version = "1.36", features = ["macros", "rt-multi-thread", "signal"] } +time = { version = "0.3", features = ["parsing", "formatting"] } +tokio = { version = "1.36", features = ["macros", "rt-multi-thread", "signal", "time"] } +tokio-stream = { version = "0.1", features = ["sync"] } tower-http = { version = "0.5", features = ["cors"] } utoipa = { version = "4.2", features = ["axum_extras"] } schemars = "0.8" [dev-dependencies] +tempfile = "3.10" diff --git a/engine/packages/sandbox-daemon/src/main.rs b/engine/packages/sandbox-daemon/src/main.rs index 9801eff..0391682 100644 --- a/engine/packages/sandbox-daemon/src/main.rs +++ b/engine/packages/sandbox-daemon/src/main.rs @@ -1,8 +1,10 @@ use std::io::Write; +use std::path::PathBuf; use clap::{Args, Parser, Subcommand}; use reqwest::blocking::Client as HttpClient; use reqwest::Method; +use sandbox_daemon_agent_management::agents::AgentManager; use sandbox_daemon_core::router::{ AgentInstallRequest, AppState, AuthConfig, CreateSessionRequest, MessageRequest, PermissionReply, PermissionReplyRequest, QuestionReplyRequest, @@ -14,6 +16,8 @@ use serde_json::Value; use thiserror::Error; use tower_http::cors::{Any, CorsLayer}; +const API_PREFIX: &str = "/v1"; + #[derive(Parser, Debug)] #[command(name = "sandbox-daemon")] #[command(about = "Sandbox daemon for managing coding agents", version)] @@ -125,10 +129,6 @@ struct CreateSessionArgs { model: Option, #[arg(long)] variant: Option, - #[arg(long = "agent-token")] - agent_token: Option, - #[arg(long)] - validate_token: bool, #[arg(long)] agent_version: Option, #[command(flatten)] @@ -237,7 +237,9 @@ fn run_server(cli: &Cli) -> Result<(), CliError> { return Err(CliError::MissingToken); }; - let state = AppState { auth }; + let agent_manager = + AgentManager::new(default_install_dir()).map_err(|err| CliError::Server(err.to_string()))?; + let state = AppState::new(auth, agent_manager); let mut router = build_router(state); if let Some(cors) = build_cors_layer(cli)? { @@ -258,6 +260,12 @@ fn run_server(cli: &Cli) -> Result<(), CliError> { }) } +fn default_install_dir() -> PathBuf { + dirs::data_dir() + .map(|dir| dir.join("sandbox-daemon").join("bin")) + .unwrap_or_else(|| PathBuf::from(".").join(".sandbox-daemon").join("bin")) +} + fn run_client(command: &Command, cli: &Cli) -> Result<(), CliError> { match command { Command::Agents(subcommand) => run_agents(&subcommand.command, cli), @@ -269,7 +277,7 @@ fn run_agents(command: &AgentsCommand, cli: &Cli) -> Result<(), CliError> { match command { AgentsCommand::List(args) => { let ctx = ClientContext::new(cli, args)?; - let response = ctx.get("/agents")?; + let response = ctx.get(&format!("{API_PREFIX}/agents"))?; print_json_response::(response) } AgentsCommand::Install(args) => { @@ -277,13 +285,13 @@ fn run_agents(command: &AgentsCommand, cli: &Cli) -> Result<(), CliError> { let body = AgentInstallRequest { reinstall: if args.reinstall { Some(true) } else { None }, }; - let path = format!("/agents/{}/install", args.agent); + let path = format!("{API_PREFIX}/agents/{}/install", args.agent); let response = ctx.post(&path, &body)?; print_empty_response(response) } AgentsCommand::Modes(args) => { let ctx = ClientContext::new(cli, &args.client)?; - let path = format!("/agents/{}/modes", args.agent); + let path = format!("{API_PREFIX}/agents/{}/modes", args.agent); let response = ctx.get(&path)?; print_json_response::(response) } @@ -300,11 +308,9 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> { permission_mode: args.permission_mode.clone(), model: args.model.clone(), variant: args.variant.clone(), - token: args.agent_token.clone(), - validate_token: if args.validate_token { Some(true) } else { None }, agent_version: args.agent_version.clone(), }; - let path = format!("/sessions/{}", args.session_id); + let path = format!("{API_PREFIX}/sessions/{}", args.session_id); let response = ctx.post(&path, &body)?; print_json_response::(response) } @@ -313,19 +319,19 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> { let body = MessageRequest { message: args.message.clone(), }; - let path = format!("/sessions/{}/messages", args.session_id); + let path = format!("{API_PREFIX}/sessions/{}/messages", args.session_id); let response = ctx.post(&path, &body)?; print_empty_response(response) } SessionsCommand::GetMessages(args) | SessionsCommand::Events(args) => { let ctx = ClientContext::new(cli, &args.client)?; - let path = format!("/sessions/{}/events", args.session_id); + let path = format!("{API_PREFIX}/sessions/{}/events", args.session_id); let response = ctx.get_with_query(&path, &[ ("offset", args.offset), ("limit", args.limit) ])?; print_json_response::(response) } SessionsCommand::EventsSse(args) => { let ctx = ClientContext::new(cli, &args.client)?; - let path = format!("/sessions/{}/events/sse", args.session_id); + let path = format!("{API_PREFIX}/sessions/{}/events/sse", args.session_id); let response = ctx.get_with_query(&path, &[("offset", args.offset)])?; print_text_response(response) } @@ -334,7 +340,7 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> { let answers: Vec> = serde_json::from_str(&args.answers)?; let body = QuestionReplyRequest { answers }; let path = format!( - "/sessions/{}/questions/{}/reply", + "{API_PREFIX}/sessions/{}/questions/{}/reply", args.session_id, args.question_id ); let response = ctx.post(&path, &body)?; @@ -343,7 +349,7 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> { SessionsCommand::RejectQuestion(args) => { let ctx = ClientContext::new(cli, &args.client)?; let path = format!( - "/sessions/{}/questions/{}/reject", + "{API_PREFIX}/sessions/{}/questions/{}/reject", args.session_id, args.question_id ); let response = ctx.post_empty(&path)?; @@ -355,7 +361,7 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> { reply: args.reply.clone(), }; let path = format!( - "/sessions/{}/permissions/{}/reply", + "{API_PREFIX}/sessions/{}/permissions/{}/reply", args.session_id, args.permission_id ); let response = ctx.post(&path, &body)?; diff --git a/engine/packages/sandbox-daemon/src/router.rs b/engine/packages/sandbox-daemon/src/router.rs index 1141511..756ee71 100644 --- a/engine/packages/sandbox-daemon/src/router.rs +++ b/engine/packages/sandbox-daemon/src/router.rs @@ -1,5 +1,10 @@ +use std::collections::{HashMap, HashSet}; use std::convert::Infallible; +use std::io::{BufRead, BufReader}; +use std::net::TcpListener; +use std::process::Stdio; use std::sync::Arc; +use std::time::Duration; use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, HeaderValue, Request, StatusCode}; @@ -9,15 +14,47 @@ use axum::response::{IntoResponse, Response, Sse}; use axum::routing::{get, post}; use axum::Json; use axum::Router; -use sandbox_daemon_error::{AgentError as AgentErrorPayload, ProblemDetails, SandboxError}; -use futures::stream; +use futures::{stream, StreamExt}; +use reqwest::Client; +use sandbox_daemon_error::{AgentError, ErrorType, ProblemDetails, SandboxError}; +use sandbox_daemon_universal_agent_schema::{ + convert_amp, convert_claude, convert_codex, convert_opencode, AttachmentSource, CrashInfo, + EventConversion, PermissionRequest, PermissionToolRef, QuestionInfo, QuestionOption, + QuestionRequest, QuestionToolRef, Started, UniversalEvent, UniversalEventData, + UniversalMessage, UniversalMessageParsed, UniversalMessagePart, +}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio::sync::{broadcast, mpsc, Mutex}; +use tokio_stream::wrappers::BroadcastStream; +use tokio::time::sleep; use utoipa::{OpenApi, ToSchema}; -#[derive(Debug, Clone)] +use sandbox_daemon_agent_management::agents::{ + AgentError as ManagerError, AgentId, AgentManager, InstallOptions, SpawnOptions, StreamingSpawn, +}; +use sandbox_daemon_agent_management::credentials::{ + extract_all_credentials, CredentialExtractionOptions, ExtractedCredentials, +}; + +#[derive(Debug)] pub struct AppState { - pub auth: AuthConfig, + auth: AuthConfig, + agent_manager: Arc, + session_manager: Arc, +} + +impl AppState { + pub fn new(auth: AuthConfig, agent_manager: AgentManager) -> Self { + let agent_manager = Arc::new(agent_manager); + let session_manager = Arc::new(SessionManager::new(agent_manager.clone())); + Self { + auth, + agent_manager, + session_manager, + } + } } #[derive(Debug, Clone)] @@ -38,7 +75,7 @@ impl AuthConfig { pub fn build_router(state: AppState) -> Router { let shared = Arc::new(state); - let router = Router::new() + let mut v1_router = Router::new() .route("/agents", get(list_agents)) .route("/agents/:agent/install", post(install_agent)) .route("/agents/:agent/modes", get(get_agent_modes)) @@ -61,10 +98,10 @@ pub fn build_router(state: AppState) -> Router { .with_state(shared.clone()); if shared.auth.token.is_some() { - router.layer(axum::middleware::from_fn_with_state(shared, require_token)) - } else { - router + v1_router = v1_router.layer(axum::middleware::from_fn_with_state(shared, require_token)); } + + Router::new().nest("/v1", v1_router) } #[derive(OpenApi)] @@ -95,12 +132,24 @@ pub fn build_router(state: AppState) -> Router { EventsResponse, UniversalEvent, UniversalEventData, - NoopMessage, + UniversalMessage, + UniversalMessageParsed, + UniversalMessagePart, + AttachmentSource, + Started, + CrashInfo, + QuestionRequest, + QuestionInfo, + QuestionOption, + QuestionToolRef, + PermissionRequest, + PermissionToolRef, QuestionReplyRequest, PermissionReplyRequest, PermissionReply, ProblemDetails, - AgentErrorPayload + ErrorType, + AgentError ) ), tags( @@ -126,6 +175,903 @@ impl IntoResponse for ApiError { } } +#[derive(Debug)] +struct SessionState { + session_id: String, + agent: AgentId, + agent_mode: String, + permission_mode: String, + model: Option, + variant: Option, + agent_session_id: Option, + next_event_id: u64, + events: Vec, + pending_questions: HashSet, + pending_permissions: HashSet, + broadcaster: broadcast::Sender, + opencode_stream_started: bool, +} + +impl SessionState { + fn new( + session_id: String, + agent: AgentId, + request: &CreateSessionRequest, + ) -> Result { + let (agent_mode, permission_mode) = normalize_modes( + agent, + request.agent_mode.as_deref(), + request.permission_mode.as_deref(), + )?; + let (broadcaster, _rx) = broadcast::channel(256); + + Ok(Self { + session_id, + agent, + agent_mode, + permission_mode, + model: request.model.clone(), + variant: request.variant.clone(), + agent_session_id: None, + next_event_id: 0, + events: Vec::new(), + pending_questions: HashSet::new(), + pending_permissions: HashSet::new(), + broadcaster, + opencode_stream_started: false, + }) + } + + fn record_conversion(&mut self, conversion: EventConversion) -> UniversalEvent { + let agent_session_id = conversion + .agent_session_id + .clone() + .or_else(|| self.agent_session_id.clone()); + if self.agent_session_id.is_none() { + self.agent_session_id = conversion.agent_session_id.clone(); + } + self.record_event(conversion.data, agent_session_id) + } + + fn record_event( + &mut self, + data: UniversalEventData, + agent_session_id: Option, + ) -> UniversalEvent { + self.next_event_id += 1; + let data = self.normalize_event_data(data); + let event = UniversalEvent { + id: self.next_event_id, + timestamp: now_rfc3339(), + session_id: self.session_id.clone(), + agent: self.agent.as_str().to_string(), + agent_session_id: agent_session_id.clone(), + data, + }; + self.update_pending(&event); + self.events.push(event.clone()); + let _ = self.broadcaster.send(event.clone()); + if self.agent_session_id.is_none() { + self.agent_session_id = agent_session_id; + } + event + } + + fn normalize_event_data(&self, mut data: UniversalEventData) -> UniversalEventData { + match &mut data { + UniversalEventData::QuestionAsked { question_asked } => { + question_asked.session_id = self.session_id.clone(); + } + UniversalEventData::PermissionAsked { permission_asked } => { + permission_asked.session_id = self.session_id.clone(); + } + _ => {} + } + data + } + + fn update_pending(&mut self, event: &UniversalEvent) { + match &event.data { + UniversalEventData::QuestionAsked { question_asked } => { + self.pending_questions.insert(question_asked.id.clone()); + } + UniversalEventData::PermissionAsked { permission_asked } => { + self.pending_permissions + .insert(permission_asked.id.clone()); + } + _ => {} + } + } + + fn take_question(&mut self, question_id: &str) -> bool { + self.pending_questions.remove(question_id) + } + + fn take_permission(&mut self, permission_id: &str) -> bool { + self.pending_permissions.remove(permission_id) + } +} + +#[derive(Debug)] +struct SessionManager { + agent_manager: Arc, + sessions: Mutex>, + opencode_server: Mutex>, + http_client: Client, +} + +#[derive(Debug)] +struct OpencodeServer { + base_url: String, + #[allow(dead_code)] + child: Option, +} + +struct SessionSubscription { + initial_events: Vec, + receiver: broadcast::Receiver, +} + +impl SessionManager { + fn new(agent_manager: Arc) -> Self { + Self { + agent_manager, + sessions: Mutex::new(HashMap::new()), + opencode_server: Mutex::new(None), + http_client: Client::new(), + } + } + + async fn create_session( + self: &Arc, + session_id: String, + request: CreateSessionRequest, + ) -> Result { + let agent_id = parse_agent_id(&request.agent)?; + { + let sessions = self.sessions.lock().await; + if sessions.contains_key(&session_id) { + return Err(SandboxError::SessionAlreadyExists { session_id }); + } + } + + let manager = self.agent_manager.clone(); + let agent_version = request.agent_version.clone(); + let agent_name = request.agent.clone(); + let install_result = tokio::task::spawn_blocking(move || { + manager.install( + agent_id, + InstallOptions { + reinstall: false, + version: agent_version, + }, + ) + }) + .await + .map_err(|err| SandboxError::InstallFailed { + agent: agent_name, + stderr: Some(err.to_string()), + })?; + install_result.map_err(|err| map_install_error(agent_id, err))?; + + let mut session = SessionState::new(session_id.clone(), agent_id, &request)?; + if agent_id == AgentId::Opencode { + let opencode_session_id = self.create_opencode_session().await?; + session.agent_session_id = Some(opencode_session_id); + } + + let started = Started { + message: Some("session.created".to_string()), + details: None, + }; + session.record_event( + UniversalEventData::Started { started }, + session.agent_session_id.clone(), + ); + + let agent_session_id = session.agent_session_id.clone(); + let mut sessions = self.sessions.lock().await; + sessions.insert(session_id.clone(), session); + drop(sessions); + + if agent_id == AgentId::Opencode { + self.ensure_opencode_stream(session_id).await?; + } + + Ok(CreateSessionResponse { + healthy: true, + error: None, + agent_session_id, + }) + } + + async fn agent_modes(&self, agent: AgentId) -> Result, SandboxError> { + if agent != AgentId::Opencode { + return Ok(agent_modes_for(agent)); + } + + match self.fetch_opencode_modes().await { + Ok(mut modes) => { + ensure_custom_mode(&mut modes); + if modes.is_empty() { + Ok(agent_modes_for(agent)) + } else { + Ok(modes) + } + } + Err(_) => Ok(agent_modes_for(agent)), + } + } + + async fn send_message( + self: &Arc, + session_id: String, + message: String, + ) -> Result<(), SandboxError> { + let session_snapshot = self.session_snapshot(&session_id).await?; + if session_snapshot.agent == AgentId::Opencode { + self.ensure_opencode_stream(session_id.clone()).await?; + self.send_opencode_prompt(&session_snapshot, &message).await?; + return Ok(()); + } + + let manager = self.agent_manager.clone(); + let prompt = message; + let credentials = tokio::task::spawn_blocking(move || { + let options = CredentialExtractionOptions::new(); + extract_all_credentials(&options) + }) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + + let spawn_options = build_spawn_options(&session_snapshot, prompt, credentials); + let agent_id = session_snapshot.agent; + let spawn_result = tokio::task::spawn_blocking(move || manager.spawn_streaming(agent_id, spawn_options)) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + + let spawn_result = spawn_result.map_err(|err| map_spawn_error(agent_id, err))?; + let manager = Arc::clone(self); + tokio::spawn(async move { + manager + .consume_spawn(session_id, agent_id, spawn_result) + .await; + }); + + Ok(()) + } + + async fn events( + &self, + session_id: &str, + offset: u64, + limit: Option, + ) -> Result { + let sessions = self.sessions.lock().await; + let session = sessions.get(session_id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: session_id.to_string(), + })?; + + let mut events: Vec = session + .events + .iter() + .filter(|event| event.id > offset) + .cloned() + .collect(); + + let has_more = if let Some(limit) = limit { + let limit = limit as usize; + if events.len() > limit { + events.truncate(limit); + true + } else { + false + } + } else { + false + }; + + Ok(EventsResponse { events, has_more }) + } + + async fn subscribe( + &self, + session_id: &str, + offset: u64, + ) -> Result { + let sessions = self.sessions.lock().await; + let session = sessions.get(session_id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: session_id.to_string(), + })?; + let initial_events = session + .events + .iter() + .filter(|event| event.id > offset) + .cloned() + .collect::>(); + let receiver = session.broadcaster.subscribe(); + Ok(SessionSubscription { + initial_events, + receiver, + }) + } + + async fn reply_question( + &self, + session_id: &str, + question_id: &str, + answers: Vec>, + ) -> Result<(), SandboxError> { + let (agent, agent_session_id) = { + let mut sessions = self.sessions.lock().await; + let session = sessions.get_mut(session_id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: session_id.to_string(), + })?; + if !session.take_question(question_id) { + return Err(SandboxError::InvalidRequest { + message: format!("unknown question id: {question_id}"), + }); + } + (session.agent, session.agent_session_id.clone()) + }; + + if agent == AgentId::Opencode { + let agent_session_id = agent_session_id.ok_or_else(|| SandboxError::InvalidRequest { + message: "missing OpenCode session id".to_string(), + })?; + self.opencode_question_reply(&agent_session_id, question_id, answers) + .await?; + } else { + // TODO: Forward question replies to subprocess agents. + } + + Ok(()) + } + + async fn reject_question( + &self, + session_id: &str, + question_id: &str, + ) -> Result<(), SandboxError> { + let (agent, agent_session_id) = { + let mut sessions = self.sessions.lock().await; + let session = sessions.get_mut(session_id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: session_id.to_string(), + })?; + if !session.take_question(question_id) { + return Err(SandboxError::InvalidRequest { + message: format!("unknown question id: {question_id}"), + }); + } + (session.agent, session.agent_session_id.clone()) + }; + + if agent == AgentId::Opencode { + let agent_session_id = agent_session_id.ok_or_else(|| SandboxError::InvalidRequest { + message: "missing OpenCode session id".to_string(), + })?; + self.opencode_question_reject(&agent_session_id, question_id) + .await?; + } else { + // TODO: Forward question rejections to subprocess agents. + } + + Ok(()) + } + + async fn reply_permission( + &self, + session_id: &str, + permission_id: &str, + reply: PermissionReply, + ) -> Result<(), SandboxError> { + let (agent, agent_session_id) = { + let mut sessions = self.sessions.lock().await; + let session = sessions.get_mut(session_id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: session_id.to_string(), + })?; + if !session.take_permission(permission_id) { + return Err(SandboxError::InvalidRequest { + message: format!("unknown permission id: {permission_id}"), + }); + } + (session.agent, session.agent_session_id.clone()) + }; + + if agent == AgentId::Opencode { + let agent_session_id = agent_session_id.ok_or_else(|| SandboxError::InvalidRequest { + message: "missing OpenCode session id".to_string(), + })?; + self.opencode_permission_reply(&agent_session_id, permission_id, reply) + .await?; + } else { + // TODO: Forward permission replies to subprocess agents. + } + + Ok(()) + } + + async fn session_snapshot(&self, session_id: &str) -> Result { + let sessions = self.sessions.lock().await; + let session = sessions.get(session_id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: session_id.to_string(), + })?; + Ok(SessionSnapshot::from(session)) + } + + async fn consume_spawn( + self: Arc, + session_id: String, + agent: AgentId, + spawn: StreamingSpawn, + ) { + let StreamingSpawn { + mut child, + stdout, + stderr, + } = spawn; + let (tx, mut rx) = mpsc::unbounded_channel::(); + + if let Some(stdout) = stdout { + let tx_stdout = tx.clone(); + tokio::task::spawn_blocking(move || { + read_lines(stdout, tx_stdout); + }); + } + if let Some(stderr) = stderr { + let tx_stderr = tx.clone(); + tokio::task::spawn_blocking(move || { + read_lines(stderr, tx_stderr); + }); + } + drop(tx); + + while let Some(line) = rx.recv().await { + if let Some(conversion) = parse_agent_line(agent, &line, &session_id) { + let _ = self.record_conversion(&session_id, conversion).await; + } + } + + let status = tokio::task::spawn_blocking(move || child.wait()).await; + match status { + Ok(Ok(status)) if status.success() => {} + Ok(Ok(status)) => { + let message = format!("agent exited with status {:?}", status); + self.record_error(&session_id, message, Some("process_exit".to_string()), None) + .await; + } + Ok(Err(err)) => { + self.record_error( + &session_id, + format!("failed to wait for agent: {err}"), + Some("process_wait_failed".to_string()), + None, + ) + .await; + } + Err(err) => { + self.record_error( + &session_id, + format!("failed to join agent task: {err}"), + Some("process_wait_failed".to_string()), + None, + ) + .await; + } + } + } + + async fn record_conversion( + &self, + session_id: &str, + conversion: EventConversion, + ) -> Result { + let mut sessions = self.sessions.lock().await; + let session = sessions.get_mut(session_id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: session_id.to_string(), + })?; + Ok(session.record_conversion(conversion)) + } + + async fn record_event( + &self, + session_id: &str, + data: UniversalEventData, + agent_session_id: Option, + ) -> Result { + let mut sessions = self.sessions.lock().await; + let session = sessions.get_mut(session_id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: session_id.to_string(), + })?; + Ok(session.record_event(data, agent_session_id)) + } + + async fn record_error( + &self, + session_id: &str, + message: String, + kind: Option, + details: Option, + ) { + let error = CrashInfo { message, kind, details }; + let _ = self + .record_event( + session_id, + UniversalEventData::Error { error }, + None, + ) + .await; + } + + async fn ensure_opencode_stream(self: &Arc, session_id: String) -> Result<(), SandboxError> { + let agent_session_id = { + let mut sessions = self.sessions.lock().await; + let session = sessions.get_mut(&session_id).ok_or_else(|| SandboxError::SessionNotFound { + session_id: session_id.clone(), + })?; + if session.opencode_stream_started { + return Ok(()); + } + let agent_session_id = session.agent_session_id.clone().ok_or_else(|| SandboxError::InvalidRequest { + message: "missing OpenCode session id".to_string(), + })?; + session.opencode_stream_started = true; + agent_session_id + }; + + let manager = Arc::clone(self); + tokio::spawn(async move { + manager + .stream_opencode_events(session_id, agent_session_id) + .await; + }); + + Ok(()) + } + + async fn stream_opencode_events(self: Arc, session_id: String, agent_session_id: String) { + let base_url = match self.ensure_opencode_server().await { + Ok(base_url) => base_url, + Err(err) => { + self.record_error( + &session_id, + format!("failed to start OpenCode server: {err}"), + Some("opencode_server".to_string()), + None, + ) + .await; + return; + } + }; + + let url = format!("{base_url}/event/subscribe"); + let response = match self.http_client.get(url).send().await { + Ok(response) => response, + Err(err) => { + self.record_error( + &session_id, + format!("OpenCode SSE connection failed: {err}"), + Some("opencode_stream".to_string()), + None, + ) + .await; + return; + } + }; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + self.record_error( + &session_id, + format!("OpenCode SSE error {status}: {body}"), + Some("opencode_stream".to_string()), + None, + ) + .await; + return; + } + + let mut accumulator = SseAccumulator::new(); + let mut stream = response.bytes_stream(); + while let Some(chunk) = stream.next().await { + let chunk = match chunk { + Ok(chunk) => chunk, + Err(err) => { + self.record_error( + &session_id, + format!("OpenCode SSE stream error: {err}"), + Some("opencode_stream".to_string()), + None, + ) + .await; + return; + } + }; + let text = String::from_utf8_lossy(&chunk); + for event_payload in accumulator.push(&text) { + let value: Value = match serde_json::from_str(&event_payload) { + Ok(value) => value, + Err(err) => { + let conversion = EventConversion::new(unparsed_message( + &event_payload, + &err.to_string(), + )); + let _ = self.record_conversion(&session_id, conversion).await; + continue; + } + }; + if !opencode_event_matches_session(&value, &agent_session_id) { + continue; + } + let conversion = match serde_json::from_value(value.clone()) { + Ok(event) => convert_opencode::event_to_universal(&event), + Err(err) => EventConversion::new(unparsed_message( + &value.to_string(), + &err.to_string(), + )), + }; + let _ = self.record_conversion(&session_id, conversion).await; + } + } + } + + async fn ensure_opencode_server(&self) -> Result { + { + let guard = self.opencode_server.lock().await; + if let Some(server) = guard.as_ref() { + return Ok(server.base_url.clone()); + } + } + + let manager = self.agent_manager.clone(); + let server = tokio::task::spawn_blocking(move || -> Result { + let path = manager + .resolve_binary(AgentId::Opencode) + .map_err(|err| map_spawn_error(AgentId::Opencode, err))?; + let port = find_available_port()?; + let mut command = std::process::Command::new(path); + command + .arg("serve") + .arg("--port") + .arg(port.to_string()) + .stdout(Stdio::null()) + .stderr(Stdio::null()); + let child = command.spawn().map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + Ok(OpencodeServer { + base_url: format!("http://127.0.0.1:{port}"), + child: Some(child), + }) + }) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })??; + + { + let mut guard = self.opencode_server.lock().await; + if let Some(existing) = guard.as_ref() { + return Ok(existing.base_url.clone()); + } + *guard = Some(server); + } + let guard = self.opencode_server.lock().await; + guard + .as_ref() + .map(|server| server.base_url.clone()) + .ok_or_else(|| SandboxError::StreamError { + message: "OpenCode server missing".to_string(), + }) + } + + async fn fetch_opencode_modes(&self) -> Result, SandboxError> { + let base_url = self.ensure_opencode_server().await?; + let endpoints = [format!("{base_url}/app/agents"), format!("{base_url}/agents")]; + for url in endpoints { + let response = self.http_client.get(&url).send().await; + let response = match response { + Ok(response) => response, + Err(_) => continue, + }; + if !response.status().is_success() { + continue; + } + let value: Value = response.json().await.map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + let modes = parse_opencode_modes(&value); + if !modes.is_empty() { + return Ok(modes); + } + } + Err(SandboxError::StreamError { + message: "OpenCode agent modes unavailable".to_string(), + }) + } + + async fn create_opencode_session(&self) -> Result { + let base_url = self.ensure_opencode_server().await?; + let url = format!("{base_url}/session"); + for _ in 0..10 { + let response = self + .http_client + .post(&url) + .json(&json!({})) + .send() + .await; + let response = match response { + Ok(response) => response, + Err(_) => { + sleep(Duration::from_millis(200)).await; + continue; + } + }; + if !response.status().is_success() { + sleep(Duration::from_millis(200)).await; + continue; + } + let value: Value = response.json().await.map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + if let Some(id) = value.get("id").and_then(Value::as_str) { + return Ok(id.to_string()); + } + if let Some(id) = value.get("sessionId").and_then(Value::as_str) { + return Ok(id.to_string()); + } + if let Some(id) = value.get("session_id").and_then(Value::as_str) { + return Ok(id.to_string()); + } + return Err(SandboxError::StreamError { + message: format!("OpenCode session response missing id: {value}"), + }); + } + Err(SandboxError::StreamError { + message: "OpenCode session create failed after retries".to_string(), + }) + } + + async fn send_opencode_prompt( + &self, + session: &SessionSnapshot, + prompt: &str, + ) -> Result<(), SandboxError> { + let base_url = self.ensure_opencode_server().await?; + let session_id = session.agent_session_id.as_ref().ok_or_else(|| SandboxError::InvalidRequest { + message: "missing OpenCode session id".to_string(), + })?; + let url = format!("{base_url}/session/{session_id}/prompt"); + let mut body = json!({ + "agent": session.agent_mode.clone(), + "parts": [{ "type": "text", "text": prompt }] + }); + if let Some(model) = session.model.as_deref() { + if let Some((provider, model_id)) = model.split_once('/') { + body["model"] = json!({ + "providerID": provider, + "modelID": model_id + }); + } else { + body["model"] = json!({ "modelID": model }); + } + } + if let Some(variant) = session.variant.as_deref() { + body["variant"] = json!(variant); + } + + let response = self + .http_client + .post(url) + .json(&body) + .send() + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(SandboxError::StreamError { + message: format!("OpenCode prompt failed {status}: {body}"), + }); + } + + Ok(()) + } + + async fn opencode_question_reply( + &self, + _session_id: &str, + request_id: &str, + answers: Vec>, + ) -> Result<(), SandboxError> { + let base_url = self.ensure_opencode_server().await?; + let url = format!("{base_url}/question/reply"); + let response = self + .http_client + .post(url) + .json(&json!({ + "requestID": request_id, + "answers": answers + })) + .send() + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(SandboxError::StreamError { + message: format!("OpenCode question reply failed {status}: {body}"), + }); + } + Ok(()) + } + + async fn opencode_question_reject( + &self, + _session_id: &str, + request_id: &str, + ) -> Result<(), SandboxError> { + let base_url = self.ensure_opencode_server().await?; + let url = format!("{base_url}/question/reject"); + let response = self + .http_client + .post(url) + .json(&json!({ "requestID": request_id })) + .send() + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(SandboxError::StreamError { + message: format!("OpenCode question reject failed {status}: {body}"), + }); + } + Ok(()) + } + + async fn opencode_permission_reply( + &self, + _session_id: &str, + request_id: &str, + reply: PermissionReply, + ) -> Result<(), SandboxError> { + let base_url = self.ensure_opencode_server().await?; + let url = format!("{base_url}/permission/reply"); + let response = self + .http_client + .post(url) + .json(&json!({ + "requestID": request_id, + "reply": reply + })) + .send() + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(SandboxError::StreamError { + message: format!("OpenCode permission reply failed {status}: {body}"), + }); + } + Ok(()) + } +} + async fn require_token( State(state): State>, req: Request, @@ -169,36 +1115,6 @@ fn extract_token(headers: &HeaderMap) -> Option { None } -// TODO: Replace NoopMessage with universal agent schema once available. -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema, Default)] -pub struct NoopMessage { - #[serde(default, skip_serializing_if = "Option::is_none")] - pub note: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct UniversalEvent { - pub id: u64, - pub timestamp: String, - pub session_id: String, - pub agent: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub agent_session_id: Option, - pub data: UniversalEventData, -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] -#[serde(untagged)] -#[allow(non_snake_case)] -pub enum UniversalEventData { - Message { message: NoopMessage }, - Started { started: NoopMessage }, - Error { error: NoopMessage }, - QuestionAsked { questionAsked: NoopMessage }, - PermissionAsked { permissionAsked: NoopMessage }, -} - #[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct AgentInstallRequest { @@ -250,10 +1166,6 @@ pub struct CreateSessionRequest { #[serde(default, skip_serializing_if = "Option::is_none")] pub variant: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - pub token: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub validate_token: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] pub agent_version: Option, } @@ -262,7 +1174,7 @@ pub struct CreateSessionRequest { pub struct CreateSessionResponse { pub healthy: bool, #[serde(default, skip_serializing_if = "Option::is_none")] - pub error: Option, + pub error: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub agent_session_id: Option, } @@ -324,7 +1236,7 @@ impl std::str::FromStr for PermissionReply { #[utoipa::path( post, - path = "/agents/{agent}/install", + path = "/v1/agents/{agent}/install", request_body = AgentInstallRequest, responses( (status = 204, description = "Agent installed"), @@ -336,17 +1248,36 @@ impl std::str::FromStr for PermissionReply { tag = "agents" )] async fn install_agent( + State(state): State>, Path(agent): Path, - Json(_request): Json, + Json(request): Json, ) -> Result { - validate_agent(&agent)?; - // TODO: Hook this up to sandbox agent management once available. + let agent_id = parse_agent_id(&agent)?; + let reinstall = request.reinstall.unwrap_or(false); + let manager = state.agent_manager.clone(); + + let result = tokio::task::spawn_blocking(move || { + manager.install( + agent_id, + InstallOptions { + reinstall, + version: None, + }, + ) + }) + .await + .map_err(|err| SandboxError::InstallFailed { + agent: agent.clone(), + stderr: Some(err.to_string()), + })?; + + result.map_err(|err| map_install_error(agent_id, err))?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path( get, - path = "/agents/{agent}/modes", + path = "/v1/agents/{agent}/modes", responses( (status = 200, body = AgentModesResponse), (status = 400, body = ProblemDetails) @@ -354,46 +1285,52 @@ async fn install_agent( params(("agent" = String, Path, description = "Agent id")), tag = "agents" )] -async fn get_agent_modes(Path(agent): Path) -> Result, ApiError> { - validate_agent(&agent)?; - let modes = vec![ - AgentModeInfo { - id: "build".to_string(), - name: "Build".to_string(), - description: "Default build mode".to_string(), - }, - AgentModeInfo { - id: "plan".to_string(), - name: "Plan".to_string(), - description: "Planning mode".to_string(), - }, - ]; +async fn get_agent_modes( + State(state): State>, + Path(agent): Path, +) -> Result, ApiError> { + let agent_id = parse_agent_id(&agent)?; + let modes = state.session_manager.agent_modes(agent_id).await?; Ok(Json(AgentModesResponse { modes })) } #[utoipa::path( get, - path = "/agents", + path = "/v1/agents", responses((status = 200, body = AgentListResponse)), tag = "agents" )] -async fn list_agents() -> Result, ApiError> { - let agents = known_agents() - .into_iter() - .map(|agent| AgentInfo { - id: agent.to_string(), - installed: false, - version: None, - path: None, - }) - .collect(); +async fn list_agents( + State(state): State>, +) -> Result, ApiError> { + let manager = state.agent_manager.clone(); + let agents = tokio::task::spawn_blocking(move || { + all_agents() + .into_iter() + .map(|agent_id| { + let installed = manager.is_installed(agent_id); + let version = manager.version(agent_id).ok().flatten(); + let path = manager.resolve_binary(agent_id).ok(); + AgentInfo { + id: agent_id.as_str().to_string(), + installed, + version, + path: path.map(|path| path.to_string_lossy().to_string()), + } + }) + .collect::>() + }) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; Ok(Json(AgentListResponse { agents })) } #[utoipa::path( post, - path = "/sessions/{session_id}", + path = "/v1/sessions/{session_id}", request_body = CreateSessionRequest, responses( (status = 200, body = CreateSessionResponse), @@ -404,22 +1341,20 @@ async fn list_agents() -> Result, ApiError> { tag = "sessions" )] async fn create_session( + State(state): State>, Path(session_id): Path, Json(request): Json, ) -> Result, ApiError> { - validate_agent(&request.agent)?; - let _ = session_id; - // TODO: Hook this up to sandbox session management once available. - Ok(Json(CreateSessionResponse { - healthy: true, - error: None, - agent_session_id: None, - })) + let response = state + .session_manager + .create_session(session_id, request) + .await?; + Ok(Json(response)) } #[utoipa::path( post, - path = "/sessions/{session_id}/messages", + path = "/v1/sessions/{session_id}/messages", request_body = MessageRequest, responses( (status = 204, description = "Message accepted"), @@ -429,17 +1364,20 @@ async fn create_session( tag = "sessions" )] async fn post_message( + State(state): State>, Path(session_id): Path, - Json(_request): Json, + Json(request): Json, ) -> Result { - let _ = session_id; - // TODO: Hook this up to sandbox session messaging once available. + state + .session_manager + .send_message(session_id, request.message) + .await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path( get, - path = "/sessions/{session_id}/events", + path = "/v1/sessions/{session_id}/events", params( ("session_id" = String, Path, description = "Session id"), ("offset" = Option, Query, description = "Last seen event id (exclusive)"), @@ -452,20 +1390,21 @@ async fn post_message( tag = "sessions" )] async fn get_events( + State(state): State>, Path(session_id): Path, - Query(_query): Query, + Query(query): Query, ) -> Result, ApiError> { - let _ = session_id; - // TODO: Hook this up to sandbox session events once available. - Ok(Json(EventsResponse { - events: Vec::new(), - has_more: false, - })) + let offset = query.offset.unwrap_or(0); + let response = state + .session_manager + .events(&session_id, offset, query.limit) + .await?; + Ok(Json(response)) } #[utoipa::path( get, - path = "/sessions/{session_id}/events/sse", + path = "/v1/sessions/{session_id}/events/sse", params( ("session_id" = String, Path, description = "Session id"), ("offset" = Option, Query, description = "Last seen event id (exclusive)") @@ -474,18 +1413,36 @@ async fn get_events( tag = "sessions" )] async fn get_events_sse( + State(state): State>, Path(session_id): Path, - Query(_query): Query, + Query(query): Query, ) -> Result>>, ApiError> { - let _ = session_id; - // TODO: Hook this up to sandbox session events once available. - let stream = stream::empty::>(); + let offset = query.offset.unwrap_or(0); + let subscription = state + .session_manager + .subscribe(&session_id, offset) + .await?; + let initial_events = subscription.initial_events; + let receiver = subscription.receiver; + + let initial_stream = stream::iter(initial_events.into_iter().map(|event| { + Ok::(to_sse_event(event)) + })); + + let live_stream = BroadcastStream::new(receiver).filter_map(|result| async move { + match result { + Ok(event) => Some(Ok::(to_sse_event(event))), + Err(_) => None, + } + }); + + let stream = initial_stream.chain(live_stream); Ok(Sse::new(stream)) } #[utoipa::path( post, - path = "/sessions/{session_id}/questions/{question_id}/reply", + path = "/v1/sessions/{session_id}/questions/{question_id}/reply", request_body = QuestionReplyRequest, responses( (status = 204, description = "Question answered"), @@ -498,16 +1455,20 @@ async fn get_events_sse( tag = "sessions" )] async fn reply_question( - Path((_session_id, _question_id)): Path<(String, String)>, - Json(_request): Json, + State(state): State>, + Path((session_id, question_id)): Path<(String, String)>, + Json(request): Json, ) -> Result { - // TODO: Hook this up to sandbox question handling once available. + state + .session_manager + .reply_question(&session_id, &question_id, request.answers) + .await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path( post, - path = "/sessions/{session_id}/questions/{question_id}/reject", + path = "/v1/sessions/{session_id}/questions/{question_id}/reject", responses( (status = 204, description = "Question rejected"), (status = 404, body = ProblemDetails) @@ -519,15 +1480,19 @@ async fn reply_question( tag = "sessions" )] async fn reject_question( - Path((_session_id, _question_id)): Path<(String, String)>, + State(state): State>, + Path((session_id, question_id)): Path<(String, String)>, ) -> Result { - // TODO: Hook this up to sandbox question handling once available. + state + .session_manager + .reject_question(&session_id, &question_id) + .await?; Ok(StatusCode::NO_CONTENT) } #[utoipa::path( post, - path = "/sessions/{session_id}/permissions/{permission_id}/reply", + path = "/v1/sessions/{session_id}/permissions/{permission_id}/reply", request_body = PermissionReplyRequest, responses( (status = 204, description = "Permission reply accepted"), @@ -540,25 +1505,489 @@ async fn reject_question( tag = "sessions" )] async fn reply_permission( - Path((_session_id, _permission_id)): Path<(String, String)>, - Json(_request): Json, + State(state): State>, + Path((session_id, permission_id)): Path<(String, String)>, + Json(request): Json, ) -> Result { - // TODO: Hook this up to sandbox permission handling once available. + state + .session_manager + .reply_permission(&session_id, &permission_id, request.reply) + .await?; Ok(StatusCode::NO_CONTENT) } -fn known_agents() -> Vec<&'static str> { - vec!["claude", "codex", "opencode", "amp"] +fn all_agents() -> [AgentId; 4] { + [ + AgentId::Claude, + AgentId::Codex, + AgentId::Opencode, + AgentId::Amp, + ] } -fn validate_agent(agent: &str) -> Result<(), ApiError> { - if known_agents().iter().any(|known| known == &agent) { - Ok(()) - } else { - Err(SandboxError::UnsupportedAgent { - agent: agent.to_string(), +fn parse_agent_id(agent: &str) -> Result { + AgentId::parse(agent).ok_or_else(|| SandboxError::UnsupportedAgent { + agent: agent.to_string(), + }) +} + +fn agent_modes_for(agent: AgentId) -> Vec { + match agent { + AgentId::Opencode => vec![ + AgentModeInfo { + id: "build".to_string(), + name: "Build".to_string(), + description: "Default build mode".to_string(), + }, + AgentModeInfo { + id: "plan".to_string(), + name: "Plan".to_string(), + description: "Planning mode".to_string(), + }, + AgentModeInfo { + id: "custom".to_string(), + name: "Custom".to_string(), + description: "Any user-defined OpenCode agent name".to_string(), + }, + ], + AgentId::Codex => vec![ + AgentModeInfo { + id: "build".to_string(), + name: "Build".to_string(), + description: "Default build mode".to_string(), + }, + AgentModeInfo { + id: "plan".to_string(), + name: "Plan".to_string(), + description: "Planning mode via prompt prefix".to_string(), + }, + ], + AgentId::Claude => vec![ + AgentModeInfo { + id: "build".to_string(), + name: "Build".to_string(), + description: "Default build mode".to_string(), + }, + AgentModeInfo { + id: "plan".to_string(), + name: "Plan".to_string(), + description: "Plan mode (requires permissionMode=plan)".to_string(), + }, + ], + AgentId::Amp => vec![AgentModeInfo { + id: "build".to_string(), + name: "Build".to_string(), + description: "Default build mode".to_string(), + }], + } +} + +fn normalize_agent_mode(agent: AgentId, agent_mode: Option<&str>) -> Result { + let mode = agent_mode.unwrap_or("build"); + match agent { + AgentId::Opencode => Ok(mode.to_string()), + AgentId::Codex => match mode { + "build" | "plan" => Ok(mode.to_string()), + value => Err(SandboxError::ModeNotSupported { + agent: agent.as_str().to_string(), + mode: value.to_string(), + } + .into()), + }, + AgentId::Claude => match mode { + "build" | "plan" => Ok(mode.to_string()), + value => Err(SandboxError::ModeNotSupported { + agent: agent.as_str().to_string(), + mode: value.to_string(), + } + .into()), + }, + AgentId::Amp => match mode { + "build" => Ok("build".to_string()), + value => Err(SandboxError::ModeNotSupported { + agent: agent.as_str().to_string(), + mode: value.to_string(), + } + .into()), + }, + } +} + +fn normalize_permission_mode( + agent: AgentId, + permission_mode: Option<&str>, +) -> Result { + let mode = match permission_mode.unwrap_or("default") { + "default" | "plan" | "bypass" => permission_mode.unwrap_or("default"), + value => { + return Err(SandboxError::InvalidRequest { + message: format!("invalid permission mode: {value}"), + } + .into()) + } + }; + let supported = match agent { + AgentId::Claude | AgentId::Codex => matches!(mode, "default" | "plan" | "bypass"), + AgentId::Amp => matches!(mode, "default" | "bypass"), + AgentId::Opencode => matches!(mode, "default"), + }; + if !supported { + return Err(SandboxError::ModeNotSupported { + agent: agent.as_str().to_string(), + mode: mode.to_string(), + } + .into()); + } + Ok(mode.to_string()) +} + +fn normalize_modes( + agent: AgentId, + agent_mode: Option<&str>, + permission_mode: Option<&str>, +) -> Result<(String, String), SandboxError> { + let agent_mode = normalize_agent_mode(agent, agent_mode)?; + if agent == AgentId::Claude && agent_mode == "plan" { + if let Some(permission_mode) = permission_mode { + if permission_mode != "plan" { + return Err(SandboxError::InvalidRequest { + message: "Claude agentMode=plan requires permissionMode=plan".to_string(), + } + .into()); + } + } + let permission_mode = normalize_permission_mode(agent, Some("plan"))?; + return Ok((agent_mode, permission_mode)); + } + let permission_mode = normalize_permission_mode(agent, permission_mode)?; + Ok((agent_mode, permission_mode)) +} + +fn map_install_error(agent: AgentId, err: ManagerError) -> SandboxError { + match err { + ManagerError::UnsupportedAgent { agent } => SandboxError::UnsupportedAgent { agent }, + ManagerError::BinaryNotFound { .. } => SandboxError::AgentNotInstalled { + agent: agent.as_str().to_string(), + }, + ManagerError::ResumeUnsupported { agent } => SandboxError::InvalidRequest { + message: format!("resume unsupported for {agent}"), + }, + ManagerError::UnsupportedPlatform { .. } + | ManagerError::DownloadFailed { .. } + | ManagerError::Http(_) + | ManagerError::UrlParse(_) + | ManagerError::Io(_) + | ManagerError::ExtractFailed(_) => SandboxError::InstallFailed { + agent: agent.as_str().to_string(), + stderr: Some(err.to_string()), + }, + } +} + +fn map_spawn_error(agent: AgentId, err: ManagerError) -> SandboxError { + match err { + ManagerError::BinaryNotFound { .. } => SandboxError::AgentNotInstalled { + agent: agent.as_str().to_string(), + }, + ManagerError::ResumeUnsupported { agent } => SandboxError::InvalidRequest { + message: format!("resume unsupported for {agent}"), + }, + _ => SandboxError::AgentProcessExited { + agent: agent.as_str().to_string(), + exit_code: None, + stderr: Some(err.to_string()), + }, + } +} + +fn build_spawn_options( + session: &SessionSnapshot, + prompt: String, + credentials: ExtractedCredentials, +) -> SpawnOptions { + let mut options = SpawnOptions::new(prompt); + options.model = session.model.clone(); + options.variant = session.variant.clone(); + options.agent_mode = Some(session.agent_mode.clone()); + options.permission_mode = Some(session.permission_mode.clone()); + options.session_id = session.agent_session_id.clone().or_else(|| { + if session.agent == AgentId::Opencode { + Some(session.session_id.clone()) + } else { + None + } + }); + if let Some(anthropic) = credentials.anthropic { + options + .env + .entry("ANTHROPIC_API_KEY".to_string()) + .or_insert(anthropic.api_key.clone()); + options + .env + .entry("CLAUDE_API_KEY".to_string()) + .or_insert(anthropic.api_key); + } + if let Some(openai) = credentials.openai { + options + .env + .entry("OPENAI_API_KEY".to_string()) + .or_insert(openai.api_key.clone()); + options + .env + .entry("CODEX_API_KEY".to_string()) + .or_insert(openai.api_key); + } + options +} + +fn read_lines(reader: R, sender: mpsc::UnboundedSender) { + let mut reader = BufReader::new(reader); + let mut line = String::new(); + loop { + line.clear(); + match reader.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let trimmed = line.trim_end_matches(&['\r', '\n'][..]).to_string(); + if sender.send(trimmed).is_err() { + break; + } + } + Err(_) => break, + } + } +} + +fn parse_agent_line(agent: AgentId, line: &str, session_id: &str) -> Option { + let trimmed = line.trim(); + if trimmed.is_empty() { + return None; + } + let value: Value = match serde_json::from_str(trimmed) { + Ok(value) => value, + Err(err) => { + return Some(EventConversion::new(unparsed_message( + trimmed, + &err.to_string(), + ))); + } + }; + let conversion = match agent { + AgentId::Claude => { + convert_claude::event_to_universal_with_session(&value, session_id.to_string()) + } + AgentId::Codex => match serde_json::from_value(value.clone()) { + Ok(event) => convert_codex::event_to_universal(&event), + Err(err) => EventConversion::new(unparsed_message( + &value.to_string(), + &err.to_string(), + )), + }, + AgentId::Opencode => match serde_json::from_value(value.clone()) { + Ok(event) => convert_opencode::event_to_universal(&event), + Err(err) => EventConversion::new(unparsed_message( + &value.to_string(), + &err.to_string(), + )), + }, + AgentId::Amp => match serde_json::from_value(value.clone()) { + Ok(event) => convert_amp::event_to_universal(&event), + Err(err) => EventConversion::new(unparsed_message( + &value.to_string(), + &err.to_string(), + )), + }, + }; + Some(conversion) +} + +fn opencode_event_matches_session(value: &Value, session_id: &str) -> bool { + match extract_opencode_session_id(value) { + Some(id) => id == session_id, + None => false, + } +} + +fn extract_opencode_session_id(value: &Value) -> Option { + if let Some(id) = value.get("session_id").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = value.get("sessionID").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = value.get("sessionId").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = extract_nested_string(value, &["properties", "sessionID"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(value, &["properties", "part", "sessionID"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(value, &["session", "id"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(value, &["properties", "session", "id"]) { + return Some(id); + } + None +} + +fn extract_nested_string(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + if let Ok(index) = key.parse::() { + current = current.get(index)?; + } else { + current = current.get(*key)?; + } + } + current.as_str().map(|s| s.to_string()) +} + +fn find_available_port() -> Result { + for port in 4200..=4300 { + if TcpListener::bind(("127.0.0.1", port)).is_ok() { + return Ok(port); + } + } + Err(SandboxError::StreamError { + message: "no available OpenCode port".to_string(), + }) +} + +struct SseAccumulator { + buffer: String, + data_lines: Vec, +} + +impl SseAccumulator { + fn new() -> Self { + Self { + buffer: String::new(), + data_lines: Vec::new(), + } + } + + fn push(&mut self, chunk: &str) -> Vec { + self.buffer.push_str(chunk); + let mut events = Vec::new(); + while let Some(pos) = self.buffer.find('\n') { + let mut line = self.buffer[..pos].to_string(); + self.buffer.drain(..=pos); + if line.ends_with('\r') { + line.pop(); + } + if line.is_empty() { + if !self.data_lines.is_empty() { + events.push(self.data_lines.join("\n")); + self.data_lines.clear(); + } + continue; + } + if let Some(data) = line.strip_prefix("data:") { + self.data_lines.push(data.trim_start().to_string()); + } + } + events + } +} + +fn parse_opencode_modes(value: &Value) -> Vec { + let mut modes = Vec::new(); + let mut seen = HashSet::new(); + + let items = value + .as_array() + .or_else(|| value.get("agents").and_then(Value::as_array)) + .or_else(|| value.get("data").and_then(Value::as_array)); + + let Some(items) = items else { return modes }; + + for item in items { + let id = item + .get("id") + .and_then(Value::as_str) + .or_else(|| item.get("slug").and_then(Value::as_str)) + .or_else(|| item.get("name").and_then(Value::as_str)); + let Some(id) = id else { continue }; + if !seen.insert(id.to_string()) { + continue; + } + let name = item + .get("name") + .and_then(Value::as_str) + .unwrap_or(id) + .to_string(); + let description = item + .get("description") + .and_then(Value::as_str) + .unwrap_or("") + .to_string(); + modes.push(AgentModeInfo { + id: id.to_string(), + name, + description, + }); + } + + modes +} + +fn ensure_custom_mode(modes: &mut Vec) { + if modes.iter().any(|mode| mode.id == "custom") { + return; + } + modes.push(AgentModeInfo { + id: "custom".to_string(), + name: "Custom".to_string(), + description: "Any user-defined OpenCode agent name".to_string(), + }); +} + +fn unparsed_message(raw: &str, error: &str) -> UniversalEventData { + UniversalEventData::Message { + message: UniversalMessage::Unparsed { + raw: Value::String(raw.to_string()), + error: Some(error.to_string()), + }, + } +} + +fn now_rfc3339() -> String { + time::OffsetDateTime::now_utc() + .format(&time::format_description::well_known::Rfc3339) + .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()) +} + +fn to_sse_event(event: UniversalEvent) -> Event { + Event::default() + .json_data(&event) + .unwrap_or_else(|_| Event::default().data("{}")) +} + +#[derive(Clone, Debug)] +struct SessionSnapshot { + session_id: String, + agent: AgentId, + agent_mode: String, + permission_mode: String, + model: Option, + variant: Option, + agent_session_id: Option, +} + +impl From<&SessionState> for SessionSnapshot { + fn from(session: &SessionState) -> Self { + Self { + session_id: session.session_id.clone(), + agent: session.agent, + agent_mode: session.agent_mode.clone(), + permission_mode: session.permission_mode.clone(), + model: session.model.clone(), + variant: session.variant.clone(), + agent_session_id: session.agent_session_id.clone(), } - .into()) } } diff --git a/spec.md b/spec.md index 17e6a8d..f14f94f 100644 --- a/spec.md +++ b/spec.md @@ -4,6 +4,8 @@ i need to build a library that is a universal api to work with agents - agent = claude code, codex, and opencode -> the acutal binary/sdk that runs the coding agent - agent mode = what the agent does, for example build/plan agent mode +- agent (id) vs agent mode: `agent` selects the implementation (claude/codex/opencode/amp), `agentMode` selects behavior (build/plan/custom). These are different from `permissionMode` (capability restrictions). +- session id vs agent session id: session id is the primary id provided by the client; agent session id is the underlying id from the agent and must be exposed but is not the primary id. - model = claude, codex, gemni, etc -> the model that's use din the agent - variant = variant on the model if exists, eg low, mid, high, xhigh for codex @@ -27,7 +29,6 @@ this also needs to support quesitons (ie human in the loop) these agents all have differnet ways of working with them. - claude code uses headless mode -- codex uses a typescript sdk - opencode uses a server ## component: daemon @@ -60,13 +61,18 @@ sandbox-daemon sessions get-messages --endpoint xxxx --token xxxx ### http api -POST /agents/{}/install (this will install the agent) -{} +POST /v1/agents/{}/install (this will install the agent) +{ reinstall?: boolean } +- `reinstall: true` forces download even if installed version matches latest. -GET /agents/{}/modes +GET /v1/agents/{}/modes < { modes: [{ id: "build", name: "Build", description: "..." }, ...] } -POST /sessions/{} (will install agent if not already installed) +GET /v1/agents +< { agents: [{ id: "claude" | "codex" | "opencode" | "amp", installed: boolean, version?: string, path?: string }] } +- Version should be checked at request time. `path` reflects the configured install location. + +POST /v1/sessions/{} (will install agent if not already installed) > { agent: "claude" | "codex" | "opencode", @@ -74,15 +80,16 @@ POST /sessions/{} (will install agent if not already installed) permissionMode?: "default" | "plan" | "bypass", // Permission restrictions model?: string, variant?: string, - token?: string, - validateToken?: boolean, agentVersion?: string } < { healthy: boolean, - error?: AgentError + error?: AgentError, + agentSessionId?: string } +- The client-provided session id is primary; `agentSessionId` is the underlying agent id (may be unknown until first prompt). +- Auth uses the daemon-level token (`Authorization` / `x-sandbox-token`); per-session tokens are not supported. // agentMode vs permissionMode: // - agentMode = what the agent DOES (behavior, system prompt) @@ -96,28 +103,28 @@ POST /sessions/{} (will install agent if not already installed) // - permissionMode "bypass" = skip all permission checks (dangerous) // - agentMode "plan" != permissionMode "plan" (one is behavior, one is restriction) -POST /sessions/{}/messages +POST /v1/sessions/{}/messages { message: string } -GET /sessions/{}/events?offset=x&limit=x +GET /v1/sessions/{}/events?offset=x&limit=x < { events: UniversalEvent[], hasMore: bool } -GET /sessions/{}/events/sse?offset=x +GET /v1/sessions/{}/events/sse?offset=x - same as above but using sse -POST /sessions/{}/questions/{questionId}/reply -{ answers: string[][] } // Array per question of selected option labels +POST /v1/sessions/{}/questions/{questionId}/reply +{ answers: string[][] } // Array per question of selected option labels (multi-select supported) -POST /sessions/{}/questions/{questionId}/reject +POST /v1/sessions/{}/questions/{questionId}/reject {} -POST /sessions/{}/permissions/{permissionId}/reply +POST /v1/sessions/{}/permissions/{permissionId}/reply { reply: "once" | "always" | "reject" } note: Claude's plan approval (ExitPlanMode) is converted to a question event with approve/reject options. No separate endpoint needed. @@ -125,6 +132,16 @@ note: Claude's plan approval (ExitPlanMode) is converted to a question event wit types: type UniversalEvent = + { + id: number, // Monotonic per-session id (used for offset) + timestamp: string, // RFC3339 + sessionId: string, // Primary id provided by client + agent: string, // Agent id (claude/codex/opencode/amp) + agentSessionId?: string, // Underlying agent session/thread id (not primary) + data: UniversalEventData + } + +type UniversalEventData = | { message: UniversalMessage } | { started: Started } | { error: CrashInfo } @@ -135,6 +152,34 @@ type UniversalEvent = type AgentError = { tokenError: ... } | { processExisted: ... } | { installFailed: ... } | etc +### error taxonomy + +All error responses use RFC 7807 Problem Details and map to a Rust `thiserror` enum. Canonical `type` values should be stable strings (e.g. `urn:sandbox-daemon:error:agent_not_installed`). + +Required error types: + +- `invalid_request` (400): malformed JSON, missing fields, invalid enum values +- `unsupported_agent` (400): unknown agent id +- `agent_not_installed` (404): agent binary missing +- `install_failed` (500): install attempted and failed +- `agent_process_exited` (500): agent subprocess exited unexpectedly +- `token_invalid` (401): token missing/invalid when required +- `permission_denied` (403): operation not allowed by permissionMode or config +- `session_not_found` (404): unknown session id +- `session_already_exists` (409): attempting to create session with existing id +- `mode_not_supported` (400): agentMode not available for agent +- `stream_error` (502): streaming/I/O failure +- `timeout` (504): agent or request timed out + +The Rust error enum should capture context (agent id, session id, exit code, stderr, etc.) and translate to Problem Details in the HTTP layer and CLI. The `AgentError` payloads used in JSON responses should be derived from the same enum so HTTP and CLI stay consistent. + +### offset semantics + +- `offset` is the last-seen `UniversalEvent.id` (exclusive). +- `GET /v1/sessions/{id}/events` returns events with `id > offset`, ordered ascending. +- `offset` defaults to `0` (or the earliest id) if not provided. +- SSE endpoint uses the same semantics and continues streaming events after the initial batch. + ### schema converters we need to have a 2 way conversion for both: @@ -222,6 +267,13 @@ A single long-running server handles multiple sessions. The daemon connects to t | OpenCode | Shared server | Native server support, lower latency | | Amp | Subprocess per session | No server mode available | +#### agent mode discovery + +- **OpenCode**: discover via server API (see `client.app.agents()` in `research/agents/opencode.md`). +- **Codex**: no discovery; hardcode supported modes (behavior via prompt prefixes). +- **Claude Code**: no discovery; hardcode supported modes (behavior mostly via prompt/policy). +- **Amp**: no discovery; hardcode supported modes (typically just `build`). + #### installation Before spawning, agents must be installed. **We curl raw binaries directly** - no npm, brew, install scripts, or other package managers. @@ -384,11 +436,12 @@ this machine is already authenticated with codex & claude & opencode (for codex) ## testing frontend -in frontend/packages/web/ build a vite server that: +in frontend/packages/web/ build a vite + react app that: - connect screen: prompts the user to provide an endpoint & optional token - shows instructions on how to run the sandbox-daemon (including cors) -- agent screen: provides a full agent ui + - if gets error or cors error, instruct the user to ensure they have cors flags enabled +- agent screen: provides a full agent ui covering all of the features. also includes a log of all http requests in the ui with a copy button for the curl command ## component: sdks @@ -397,6 +450,11 @@ we need to auto-generate types from our json schema for these languages - typescript sdk - expose our http api as a typescript sdk - update claude.md to specify that when changing api, we need to update the typescript sdk + the cli to interact with it + - impelment two main entrypoint: connect to endpoint + token or run locally (which spawns this binary as a subprocess, add todo to set up release pipeline and auto-pull the binary) + +### typescript sdk approach + +Use OpenAPI (from utoipa) + `openapi-typescript` to generate types, and implement a thin custom client wrapper (fetch-based) around the generated types. Avoid full client generators to keep the output small and stable. ## examples @@ -432,45 +490,3 @@ write a readme that doubles as docs for: - typescript sdk use the collapsible github sections for things like each api endpoint or each typescript sdk endpoint to collapse more info. this keeps the page readable. - -## spec todo - -- generate common denominator with conversion functions -- how should we handle the tokens for auth? - -## future problems to visit - -- api features - - list agent modes available - - list models available - - handle planning mode -- api key gateway -- configuring mcp/skills/etc -- process management inside container -- otel -- better authentication systems -- s3-based file system -- ai sdk compatibility for their ecosystem (useChat, etc) -- resumable messages -- todo lists -- all other features -- misc - - bootstrap tool that extracts tokens from the current system -- skill -- pre-package these as bun binaries instead of npm installations -- build & release pipeline with musl -- agent feature matrix for api features -- tunnels - -## future work - -- mcp integration (can connect to given endpoints) -- provide a pty to access the agent data -- other agent features like file system -- python sdk - -## misc - -comparison to agentapi: -- it does not use the pty since we need to get more information from the agent - diff --git a/spec/im-not-sure.md b/spec/im-not-sure.md index c51e8e5..290178a 100644 --- a/spec/im-not-sure.md +++ b/spec/im-not-sure.md @@ -1,3 +1,5 @@ -# Open Questions +# Open Questions / Ambiguities -- None yet. +- OpenCode server HTTP paths and payloads may differ; current implementation assumes `POST /session`, `POST /session/{id}/prompt`, and `GET /event/subscribe` with JSON `data:` SSE frames. +- OpenCode question/permission reply endpoints are assumed as `POST /question/reply`, `/question/reject`, `/permission/reply` with `requestID` fields; confirm actual API shape. +- SSE events may not always include `sessionID`/`sessionId` fields; confirm if filtering should use a different field. diff --git a/spec/required-tests.md b/spec/required-tests.md index 5465279..96a144f 100644 --- a/spec/required-tests.md +++ b/spec/required-tests.md @@ -1,5 +1,7 @@ # Required Tests -- `test_agents_install_version_spawn` (installs, checks version, spawns prompt for Claude/Codex/OpenCode; Amp spawn runs only if `~/.amp/config.json` exists) -- daemon http api: smoke tests for each endpoint response shape/status -- cli: subcommands hit expected endpoints and handle error responses +- Session manager streams JSONL line-by-line for Claude/Codex/Amp and yields incremental events. +- `/sessions/{id}/messages` returns immediately while background ingestion populates `/events` and `/events/sse`. +- SSE subscription delivers live events after the initial offset batch. +- OpenCode server mode: create session, send prompt, and receive SSE events filtered to the session. +- OpenCode question/permission reply endpoints forward to server APIs. diff --git a/todo.md b/todo.md index 63742b3..8c243d7 100644 --- a/todo.md +++ b/todo.md @@ -1,9 +1,98 @@ -# TODO +# TODO (from spec.md) -- [x] Scaffold `engine/packages/sandbox-daemon` crate -- [x] Implement agent management modules (install/version/spawn basics) -- [x] Add tests for agent install/version/spawn -- [x] Track required tests in `spec/required-tests.md` -- [x] Track open questions in `spec/im-not-sure.md` -- [ ] Hook sandbox/session management into the daemon router handlers -- [ ] Replace noop schemas with universal agent schema and remove the old schema +## Universal API + Types +- [x] Define universal base types for agent input/output (common denominator across schemas) +- [x] Add universal question + permission types (HITL) and ensure they are supported end-to-end +- [x] Define `UniversalEvent` + `UniversalEventData` union and `AgentError` shape +- [x] Define a universal message type for "failed to parse" with raw JSON payload +- [x] Implement 2-way converters: + - [x] Universal input message <-> agent-specific input + - [x] Universal event <-> agent-specific event +- [x] Enforce agentMode vs permissionMode semantics + defaults at the API boundary +- [x] Ensure session id vs agentSessionId semantics are respected and surfaced consistently + +## Daemon (Rust HTTP server) +- [x] Build axum router + utoipa + schemars integration +- [x] Implement RFC 7807 Problem Details error responses backed by a `thiserror` enum +- [x] Implement canonical error `type` values + required error variants from spec +- [x] Implement offset semantics for events (exclusive last-seen id, default offset 0) +- [x] Implement SSE endpoint for events with same semantics as JSON endpoint +- [x] Replace in-memory session store with sandbox session manager (questions/permissions routing, long-lived processes) + +## CLI +- [x] Implement clap CLI flags: `--token`, `--no-token`, `--host`, `--port`, CORS flags +- [x] Implement a CLI endpoint for every HTTP endpoint +- [ ] Update `CLAUDE.md` to keep CLI endpoints in sync with HTTP API changes +- [x] Prefix CLI API requests with `/v1` + +## HTTP API Endpoints +- [x] POST `/agents/{}/install` with `reinstall` handling +- [x] GET `/agents/{}/modes` (mode discovery or hardcoded) +- [x] GET `/agents` (installed/version/path; version checked at request time) +- [x] POST `/sessions/{}` (create session, install if needed, return health + agentSessionId) +- [x] POST `/sessions/{}/messages` (send prompt) +- [x] GET `/sessions/{}/events` (pagination with offset/limit) +- [x] GET `/sessions/{}/events/sse` (streaming) +- [x] POST `/sessions/{}/questions/{questionId}/reply` +- [x] POST `/sessions/{}/questions/{questionId}/reject` +- [x] POST `/sessions/{}/permissions/{permissionId}/reply` +- [x] Prefix all HTTP API endpoints with `/v1` + +## Agent Management +- [x] Implement install/version/spawn basics for Claude/Codex/OpenCode/Amp +- [x] Implement agent install URL patterns + platform mappings for supported OS/arch +- [x] Parse JSONL output for subprocess agents and extract session/result metadata +- [x] Map permissionMode to agent CLI flags (Claude/Codex/Amp) +- [x] Implement session resume flags for Claude/OpenCode/Amp (Codex unsupported) +- [x] Replace sandbox-daemon core agent modules with new agent-management crate (delete originals) +- [x] Stabilize agent-management crate API and fix build issues (sandbox-daemon currently wired to WIP crate) +- [x] Implement OpenCode shared server lifecycle (`opencode serve`, health, restart) +- [x] Implement OpenCode HTTP session APIs + SSE event stream integration +- [x] Implement JSONL parsing for subprocess agents and map to `UniversalEvent` +- [x] Capture agent session id from events and expose as `agentSessionId` +- [x] Handle agent process exit and map to `agent_process_exited` error +- [x] Implement agentMode discovery rules (OpenCode API, hardcoded others) +- [x] Enforce permissionMode behavior (default/plan/bypass) for subprocesses + +## Credentials +- [x] Implement credential extraction module (Claude/Codex/OpenCode) +- [x] Add Amp credential extraction (config-based) +- [x] Move credential extraction into `agent-credentials` crate +- [ ] Pass extracted credentials into subprocess env vars per agent +- [ ] Ensure OpenCode server reads credentials from config on startup + +## Testing +- [ ] Build a universal agent test suite that exercises all features (messages, questions, permissions, etc.) using HTTP API +- [ ] Run the full suite against every agent (Claude/Codex/OpenCode/Amp) without mocks +- [x] Add real install/version/spawn tests for Claude/Codex/OpenCode (Amp conditional) +- [x] Expand agent lifecycle tests (reinstall, session id extraction, resume, plan mode) +- [ ] Add OpenCode server-mode tests (session create, prompt, SSE) +- [ ] Add tests for question/permission flows using deterministic prompts + +## Frontend (frontend/packages/web) +- [x] Build Vite + React app with connect screen (endpoint + optional token) +- [x] Add instructions to run sandbox-daemon (including CORS) +- [x] Implement full agent UI covering all features +- [x] Add HTTP request log with copyable curl command + +## TypeScript SDK +- [x] Generate OpenAPI from utoipa and run `openapi-typescript` +- [x] Implement a thin fetch-based client wrapper +- [x] Update `CLAUDE.md` to require SDK + CLI updates when API changes +- [x] Prefix SDK requests with `/v1` + +## Examples + Tests +- [ ] Add examples for Docker, E2B, Daytona, Vercel Sandboxes, Cloudflare Sandboxes +- [ ] Add Vitest unit test for each example (Cloudflare requires special setup) + +## Documentation +- [ ] Write README covering architecture, agent compatibility, and deployment guide +- [ ] Add universal API feature checklist (questions, approve plan, etc.) +- [ ] Document CLI, HTTP API, frontend app, and TypeScript SDK usage +- [ ] Use collapsible sections for endpoints and SDK methods + +--- + +- implement release pipeline +- implement e2b example +- implement typescript "start locally" by pulling form server using version