chore: cargo fmt

This commit is contained in:
Nathan Flurry 2026-02-01 22:39:52 -08:00
parent a3e55a8976
commit 85ee3b30cd
33 changed files with 843 additions and 676 deletions

View file

@ -43,7 +43,9 @@ impl CredentialExtractionOptions {
} }
} }
pub fn extract_claude_credentials(options: &CredentialExtractionOptions) -> Option<ProviderCredentials> { pub fn extract_claude_credentials(
options: &CredentialExtractionOptions,
) -> Option<ProviderCredentials> {
let home_dir = options.home_dir.clone().unwrap_or_else(default_home_dir); let home_dir = options.home_dir.clone().unwrap_or_else(default_home_dir);
let include_oauth = options.include_oauth; let include_oauth = options.include_oauth;
@ -88,8 +90,7 @@ pub fn extract_claude_credentials(options: &CredentialExtractionOptions) -> Opti
}; };
let access = read_string_field(&data, &["claudeAiOauth", "accessToken"]); let access = read_string_field(&data, &["claudeAiOauth", "accessToken"]);
if let Some(token) = access { if let Some(token) = access {
if let Some(expires_at) = if let Some(expires_at) = read_string_field(&data, &["claudeAiOauth", "expiresAt"])
read_string_field(&data, &["claudeAiOauth", "expiresAt"])
{ {
if is_expired_rfc3339(&expires_at) { if is_expired_rfc3339(&expires_at) {
continue; continue;
@ -108,7 +109,9 @@ pub fn extract_claude_credentials(options: &CredentialExtractionOptions) -> Opti
None None
} }
pub fn extract_codex_credentials(options: &CredentialExtractionOptions) -> Option<ProviderCredentials> { pub fn extract_codex_credentials(
options: &CredentialExtractionOptions,
) -> Option<ProviderCredentials> {
let home_dir = options.home_dir.clone().unwrap_or_else(default_home_dir); let home_dir = options.home_dir.clone().unwrap_or_else(default_home_dir);
let include_oauth = options.include_oauth; let include_oauth = options.include_oauth;
let path = home_dir.join(".codex").join("auth.json"); let path = home_dir.join(".codex").join("auth.json");
@ -165,13 +168,13 @@ pub fn extract_opencode_credentials(options: &CredentialExtractionOptions) -> Ex
None => continue, None => continue,
}; };
let auth_type = config let auth_type = config.get("type").and_then(Value::as_str).unwrap_or("");
.get("type")
.and_then(Value::as_str)
.unwrap_or("");
let credentials = if auth_type == "api" { let credentials = if auth_type == "api" {
config.get("key").and_then(Value::as_str).map(|key| ProviderCredentials { config
.get("key")
.and_then(Value::as_str)
.map(|key| ProviderCredentials {
api_key: key.to_string(), api_key: key.to_string(),
source: "opencode".to_string(), source: "opencode".to_string(),
auth_type: AuthType::ApiKey, auth_type: AuthType::ApiKey,
@ -214,7 +217,9 @@ pub fn extract_opencode_credentials(options: &CredentialExtractionOptions) -> Ex
} else if provider_name == "openai" { } else if provider_name == "openai" {
result.openai = Some(credentials.clone()); result.openai = Some(credentials.clone());
} else { } else {
result.other.insert(provider_name.to_string(), credentials.clone()); result
.other
.insert(provider_name.to_string(), credentials.clone());
} }
} }
} }
@ -222,7 +227,9 @@ pub fn extract_opencode_credentials(options: &CredentialExtractionOptions) -> Ex
result result
} }
pub fn extract_amp_credentials(options: &CredentialExtractionOptions) -> Option<ProviderCredentials> { pub fn extract_amp_credentials(
options: &CredentialExtractionOptions,
) -> Option<ProviderCredentials> {
let home_dir = options.home_dir.clone().unwrap_or_else(default_home_dir); let home_dir = options.home_dir.clone().unwrap_or_else(default_home_dir);
let path = home_dir.join(".amp").join("config.json"); let path = home_dir.join(".amp").join("config.json");
let data = read_json_file(&path)?; let data = read_json_file(&path)?;

View file

@ -3,15 +3,7 @@ use std::fmt;
use std::fs; use std::fs;
use std::io::{self, BufRead, BufReader, Read, Write}; use std::io::{self, BufRead, BufReader, Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::{ use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
Child,
ChildStderr,
ChildStdin,
ChildStdout,
Command,
ExitStatus,
Stdio,
};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
@ -124,17 +116,18 @@ impl AgentManager {
}) })
} }
pub fn with_platform( pub fn with_platform(install_dir: impl Into<PathBuf>, platform: Platform) -> Self {
install_dir: impl Into<PathBuf>,
platform: Platform,
) -> Self {
Self { Self {
install_dir: install_dir.into(), install_dir: install_dir.into(),
platform, platform,
} }
} }
pub fn install(&self, agent: AgentId, options: InstallOptions) -> Result<InstallResult, AgentError> { pub fn install(
&self,
agent: AgentId,
options: InstallOptions,
) -> Result<InstallResult, AgentError> {
let install_path = self.binary_path(agent); let install_path = self.binary_path(agent);
if !options.reinstall { if !options.reinstall {
if let Ok(existing_path) = self.resolve_binary(agent) { if let Ok(existing_path) = self.resolve_binary(agent) {
@ -148,9 +141,15 @@ impl AgentManager {
fs::create_dir_all(&self.install_dir)?; fs::create_dir_all(&self.install_dir)?;
match agent { match agent {
AgentId::Claude => install_claude(&install_path, self.platform, options.version.as_deref())?, AgentId::Claude => {
AgentId::Codex => install_codex(&install_path, self.platform, options.version.as_deref())?, install_claude(&install_path, self.platform, options.version.as_deref())?
AgentId::Opencode => install_opencode(&install_path, self.platform, options.version.as_deref())?, }
AgentId::Codex => {
install_codex(&install_path, self.platform, options.version.as_deref())?
}
AgentId::Opencode => {
install_opencode(&install_path, self.platform, options.version.as_deref())?
}
AgentId::Amp => install_amp(&install_path, self.platform, options.version.as_deref())?, AgentId::Amp => install_amp(&install_path, self.platform, options.version.as_deref())?,
AgentId::Mock => { AgentId::Mock => {
if !install_path.exists() { if !install_path.exists() {
@ -256,10 +255,7 @@ impl AgentManager {
command.arg("app-server"); command.arg("app-server");
} }
AgentId::Opencode => { AgentId::Opencode => {
command command.arg("run").arg("--format").arg("json");
.arg("run")
.arg("--format")
.arg("json");
if let Some(model) = options.model.as_deref() { if let Some(model) = options.model.as_deref() {
command.arg("-m").arg(model); command.arg("-m").arg(model);
} }
@ -346,10 +342,15 @@ impl AgentManager {
fn spawn_codex_app_server(&self, options: SpawnOptions) -> Result<SpawnResult, AgentError> { fn spawn_codex_app_server(&self, options: SpawnOptions) -> Result<SpawnResult, AgentError> {
if options.session_id.is_some() { if options.session_id.is_some() {
return Err(AgentError::ResumeUnsupported { agent: AgentId::Codex }); return Err(AgentError::ResumeUnsupported {
agent: AgentId::Codex,
});
} }
let mut command = self.build_command(AgentId::Codex, &options)?; let mut command = self.build_command(AgentId::Codex, &options)?;
command.stdin(Stdio::piped()).stdout(Stdio::piped()).stderr(Stdio::piped()); command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for (key, value) in options.env { for (key, value) in options.env {
command.env(key, value); command.env(key, value);
} }
@ -417,8 +418,8 @@ impl AgentManager {
Ok(value) => value, Ok(value) => value,
Err(_) => continue, Err(_) => continue,
}; };
let message: codex_schema::JsonrpcMessage = let message: codex_schema::JsonrpcMessage = match serde_json::from_value(value.clone())
match serde_json::from_value(value.clone()) { {
Ok(message) => message, Ok(message) => message,
Err(_) => continue, Err(_) => continue,
}; };
@ -443,11 +444,16 @@ impl AgentManager {
params.cwd = cwd.clone(); params.cwd = cwd.clone();
send_json_line( send_json_line(
&mut stdin, &mut stdin,
&codex_schema::ClientRequest::ThreadStart { id: request_id, params }, &codex_schema::ClientRequest::ThreadStart {
id: request_id,
params,
},
)?; )?;
thread_start_id = Some(request_id_str); thread_start_id = Some(request_id_str);
thread_start_sent = true; thread_start_sent = true;
} else if thread_start_id.as_deref() == Some(&response_id) && thread_id.is_none() { } else if thread_start_id.as_deref() == Some(&response_id)
&& thread_id.is_none()
{
thread_id = codex_thread_id_from_response(&response.result); thread_id = codex_thread_id_from_response(&response.result);
} }
events.push(value); events.push(value);
@ -466,8 +472,11 @@ impl AgentManager {
) { ) {
completed = true; completed = true;
} }
if let codex_schema::ServerNotification::ItemCompleted(params) = &notification { if let codex_schema::ServerNotification::ItemCompleted(params) =
if matches!(params.item, codex_schema::ThreadItem::AgentMessage { .. }) { &notification
{
if matches!(params.item, codex_schema::ThreadItem::AgentMessage { .. })
{
completed = true; completed = true;
} }
} }
@ -809,17 +818,35 @@ fn codex_thread_id_from_notification(
codex_schema::ServerNotification::TurnCompleted(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::TurnCompleted(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemStarted(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::ItemStarted(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemCompleted(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::ItemCompleted(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemAgentMessageDelta(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::ItemAgentMessageDelta(params) => {
codex_schema::ServerNotification::ItemReasoningTextDelta(params) => Some(params.thread_id.clone()), Some(params.thread_id.clone())
codex_schema::ServerNotification::ItemReasoningSummaryTextDelta(params) => Some(params.thread_id.clone()), }
codex_schema::ServerNotification::ItemCommandExecutionOutputDelta(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::ItemReasoningTextDelta(params) => {
codex_schema::ServerNotification::ItemFileChangeOutputDelta(params) => Some(params.thread_id.clone()), Some(params.thread_id.clone())
codex_schema::ServerNotification::ItemMcpToolCallProgress(params) => Some(params.thread_id.clone()), }
codex_schema::ServerNotification::ThreadTokenUsageUpdated(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::ItemReasoningSummaryTextDelta(params) => {
Some(params.thread_id.clone())
}
codex_schema::ServerNotification::ItemCommandExecutionOutputDelta(params) => {
Some(params.thread_id.clone())
}
codex_schema::ServerNotification::ItemFileChangeOutputDelta(params) => {
Some(params.thread_id.clone())
}
codex_schema::ServerNotification::ItemMcpToolCallProgress(params) => {
Some(params.thread_id.clone())
}
codex_schema::ServerNotification::ThreadTokenUsageUpdated(params) => {
Some(params.thread_id.clone())
}
codex_schema::ServerNotification::TurnDiffUpdated(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::TurnDiffUpdated(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::TurnPlanUpdated(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::TurnPlanUpdated(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemCommandExecutionTerminalInteraction(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::ItemCommandExecutionTerminalInteraction(params) => {
codex_schema::ServerNotification::ItemReasoningSummaryPartAdded(params) => Some(params.thread_id.clone()), Some(params.thread_id.clone())
}
codex_schema::ServerNotification::ItemReasoningSummaryPartAdded(params) => {
Some(params.thread_id.clone())
}
codex_schema::ServerNotification::ThreadCompacted(params) => Some(params.thread_id.clone()), codex_schema::ServerNotification::ThreadCompacted(params) => Some(params.thread_id.clone()),
_ => None, _ => None,
} }
@ -902,7 +929,8 @@ fn extract_session_id(agent: AgentId, events: &[Value]) -> Option<String> {
if let Some(id) = extract_nested_string(event, &["properties", "sessionID"]) { if let Some(id) = extract_nested_string(event, &["properties", "sessionID"]) {
return Some(id); return Some(id);
} }
if let Some(id) = extract_nested_string(event, &["properties", "part", "sessionID"]) { if let Some(id) = extract_nested_string(event, &["properties", "part", "sessionID"])
{
return Some(id); return Some(id);
} }
if let Some(id) = extract_nested_string(event, &["session", "id"]) { if let Some(id) = extract_nested_string(event, &["session", "id"]) {
@ -925,7 +953,9 @@ fn extract_result_text(agent: AgentId, events: &[Value]) -> Option<String> {
if let Some(result) = event.get("result").and_then(Value::as_str) { if let Some(result) = event.get("result").and_then(Value::as_str) {
return Some(result.to_string()); return Some(result.to_string());
} }
if let Some(text) = extract_nested_string(event, &["message", "content", "0", "text"]) { if let Some(text) =
extract_nested_string(event, &["message", "content", "0", "text"])
{
return Some(text); return Some(text);
} }
} }
@ -974,7 +1004,9 @@ fn extract_result_text(agent: AgentId, events: &[Value]) -> Option<String> {
if let Some(delta) = extract_nested_string(event, &["properties", "delta"]) { if let Some(delta) = extract_nested_string(event, &["properties", "delta"]) {
buffer.push_str(&delta); buffer.push_str(&delta);
} }
if let Some(content) = extract_nested_string(event, &["properties", "part", "content"]) { if let Some(content) =
extract_nested_string(event, &["properties", "part", "content"])
{
buffer.push_str(&content); buffer.push_str(&content);
} }
} }
@ -1178,14 +1210,19 @@ fn download_bytes(url: &Url) -> Result<Vec<u8>, AgentError> {
Ok(bytes) Ok(bytes)
} }
fn install_claude(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> { fn install_claude(
path: &Path,
platform: Platform,
version: Option<&str>,
) -> Result<(), AgentError> {
let version = match version { let version = match version {
Some(version) => version.to_string(), Some(version) => version.to_string(),
None => { None => {
let url = Url::parse( let url = Url::parse(
"https://storage.googleapis.com/claude-code-dist-86c565f3-f756-42ad-8dfa-d59b1c096819/claude-code-releases/latest", "https://storage.googleapis.com/claude-code-dist-86c565f3-f756-42ad-8dfa-d59b1c096819/claude-code-releases/latest",
)?; )?;
let text = String::from_utf8(download_bytes(&url)?).map_err(|err| AgentError::ExtractFailed(err.to_string()))?; let text = String::from_utf8(download_bytes(&url)?)
.map_err(|err| AgentError::ExtractFailed(err.to_string()))?;
text.trim().to_string() text.trim().to_string()
} }
}; };
@ -1210,8 +1247,11 @@ fn install_amp(path: &Path, platform: Platform, version: Option<&str>) -> Result
let version = match version { let version = match version {
Some(version) => version.to_string(), Some(version) => version.to_string(),
None => { None => {
let url = Url::parse("https://storage.googleapis.com/amp-public-assets-prod-0/cli/cli-version.txt")?; let url = Url::parse(
let text = String::from_utf8(download_bytes(&url)?).map_err(|err| AgentError::ExtractFailed(err.to_string()))?; "https://storage.googleapis.com/amp-public-assets-prod-0/cli/cli-version.txt",
)?;
let text = String::from_utf8(download_bytes(&url)?)
.map_err(|err| AgentError::ExtractFailed(err.to_string()))?;
text.trim().to_string() text.trim().to_string()
} }
}; };
@ -1261,7 +1301,11 @@ fn install_codex(path: &Path, platform: Platform, version: Option<&str>) -> Resu
Ok(()) Ok(())
} }
fn install_opencode(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> { fn install_opencode(
path: &Path,
platform: Platform,
version: Option<&str>,
) -> Result<(), AgentError> {
match platform { match platform {
Platform::MacosArm64 => { Platform::MacosArm64 => {
let url = match version { let url = match version {
@ -1317,7 +1361,8 @@ fn install_opencode(path: &Path, platform: Platform, version: Option<&str>) -> R
fn install_zip_binary(path: &Path, url: &Url, binary_name: &str) -> Result<(), AgentError> { fn install_zip_binary(path: &Path, url: &Url, binary_name: &str) -> Result<(), AgentError> {
let bytes = download_bytes(url)?; let bytes = download_bytes(url)?;
let reader = io::Cursor::new(bytes); let reader = io::Cursor::new(bytes);
let mut archive = zip::ZipArchive::new(reader).map_err(|err| AgentError::ExtractFailed(err.to_string()))?; let mut archive =
zip::ZipArchive::new(reader).map_err(|err| AgentError::ExtractFailed(err.to_string()))?;
let temp_dir = tempfile::tempdir()?; let temp_dir = tempfile::tempdir()?;
for i in 0..archive.len() { for i in 0..archive.len() {
let mut file = archive let mut file = archive

View file

@ -285,10 +285,7 @@ fn handle_health_response(
}) })
} }
fn run_blocking_check<F>( fn run_blocking_check<F>(provider: &str, check: F) -> Result<(), TestAgentConfigError>
provider: &str,
check: F,
) -> Result<(), TestAgentConfigError>
where where
F: FnOnce() -> Result<(), TestAgentConfigError> + Send + 'static, F: FnOnce() -> Result<(), TestAgentConfigError> + Send + 'static,
{ {
@ -301,7 +298,12 @@ where
} }
fn detect_system_agents() -> Vec<AgentId> { fn detect_system_agents() -> Vec<AgentId> {
let candidates = [AgentId::Claude, AgentId::Codex, AgentId::Opencode, AgentId::Amp]; let candidates = [
AgentId::Claude,
AgentId::Codex,
AgentId::Opencode,
AgentId::Amp,
];
let install_dir = default_install_dir(); let install_dir = default_install_dir();
candidates candidates
.into_iter() .into_iter()

View file

@ -123,7 +123,10 @@ pub enum SandboxError {
#[error("agent not installed: {agent}")] #[error("agent not installed: {agent}")]
AgentNotInstalled { agent: String }, AgentNotInstalled { agent: String },
#[error("install failed: {agent}")] #[error("install failed: {agent}")]
InstallFailed { agent: String, stderr: Option<String> }, InstallFailed {
agent: String,
stderr: Option<String>,
},
#[error("agent process exited: {agent}")] #[error("agent process exited: {agent}")]
AgentProcessExited { AgentProcessExited {
agent: String, agent: String,
@ -167,9 +170,7 @@ impl SandboxError {
pub fn to_agent_error(&self) -> AgentError { pub fn to_agent_error(&self) -> AgentError {
let (agent, session_id, details) = match self { let (agent, session_id, details) = match self {
Self::InvalidRequest { .. } => (None, None, None), Self::InvalidRequest { .. } => (None, None, None),
Self::UnsupportedAgent { agent } => { Self::UnsupportedAgent { agent } => (Some(agent.clone()), None, None),
(Some(agent.clone()), None, None)
}
Self::AgentNotInstalled { agent } => (Some(agent.clone()), None, None), Self::AgentNotInstalled { agent } => (Some(agent.clone()), None, None),
Self::InstallFailed { agent, stderr } => { Self::InstallFailed { agent, stderr } => {
let mut map = Map::new(); let mut map = Map::new();
@ -179,7 +180,11 @@ impl SandboxError {
( (
Some(agent.clone()), Some(agent.clone()),
None, None,
if map.is_empty() { None } else { Some(Value::Object(map)) }, if map.is_empty() {
None
} else {
Some(Value::Object(map))
},
) )
} }
Self::AgentProcessExited { Self::AgentProcessExited {
@ -200,7 +205,11 @@ impl SandboxError {
( (
Some(agent.clone()), Some(agent.clone()),
None, None,
if map.is_empty() { None } else { Some(Value::Object(map)) }, if map.is_empty() {
None
} else {
Some(Value::Object(map))
},
) )
} }
Self::TokenInvalid { message } => { Self::TokenInvalid { message } => {
@ -219,20 +228,12 @@ impl SandboxError {
}); });
(None, None, details) (None, None, details)
} }
Self::SessionNotFound { session_id } => { Self::SessionNotFound { session_id } => (None, Some(session_id.clone()), None),
(None, Some(session_id.clone()), None) Self::SessionAlreadyExists { session_id } => (None, Some(session_id.clone()), None),
}
Self::SessionAlreadyExists { session_id } => {
(None, Some(session_id.clone()), None)
}
Self::ModeNotSupported { agent, mode } => { Self::ModeNotSupported { agent, mode } => {
let mut map = Map::new(); let mut map = Map::new();
map.insert("mode".to_string(), Value::String(mode.clone())); map.insert("mode".to_string(), Value::String(mode.clone()));
( (Some(agent.clone()), None, Some(Value::Object(map)))
Some(agent.clone()),
None,
Some(Value::Object(map)),
)
} }
Self::StreamError { message } => { Self::StreamError { message } => {
let mut map = Map::new(); let mut map = Map::new();

View file

@ -17,10 +17,7 @@ fn main() {
let schema_path = schema_dir.join(file); let schema_path = schema_dir.join(file);
// Tell cargo to rerun if schema changes // Tell cargo to rerun if schema changes
emit_stdout(&format!( emit_stdout(&format!("cargo:rerun-if-changed={}", schema_path.display()));
"cargo:rerun-if-changed={}",
schema_path.display()
));
if !schema_path.exists() { if !schema_path.exists() {
emit_stdout(&format!( emit_stdout(&format!(
@ -48,9 +45,10 @@ fn main() {
let contents = type_space.to_stream(); let contents = type_space.to_stream();
// Format the generated code // Format the generated code
let formatted = prettyplease::unparse(&syn::parse2(contents.clone()).unwrap_or_else(|e| { let formatted = prettyplease::unparse(
panic!("Failed to parse generated code for {}: {}", name, e) &syn::parse2(contents.clone())
})); .unwrap_or_else(|e| panic!("Failed to parse generated code for {}: {}", name, e)),
);
let out_path = Path::new(&out_dir).join(format!("{}.rs", name)); let out_path = Path::new(&out_dir).join(format!("{}.rs", name));
fs::write(&out_path, formatted) fs::write(&out_path, formatted)

View file

@ -17,16 +17,14 @@ fn test_claude_bash_input() {
#[test] #[test]
fn test_codex_server_notification() { fn test_codex_server_notification() {
let notification = codex::ServerNotification::ItemCompleted( let notification = codex::ServerNotification::ItemCompleted(codex::ItemCompletedNotification {
codex::ItemCompletedNotification {
item: codex::ThreadItem::AgentMessage { item: codex::ThreadItem::AgentMessage {
id: "msg-123".to_string(), id: "msg-123".to_string(),
text: "Hello from Codex".to_string(), text: "Hello from Codex".to_string(),
}, },
thread_id: "thread-123".to_string(), thread_id: "thread-123".to_string(),
turn_id: "turn-456".to_string(), turn_id: "turn-456".to_string(),
}, });
);
let json = serde_json::to_string(&notification).unwrap(); let json = serde_json::to_string(&notification).unwrap();
assert!(json.contains("item/completed")); assert!(json.contains("item/completed"));

View file

@ -13,8 +13,7 @@ fn main() {
let out_path = Path::new(&out_dir).join("openapi.json"); let out_path = Path::new(&out_dir).join("openapi.json");
let openapi = ApiDoc::openapi(); let openapi = ApiDoc::openapi();
let json = serde_json::to_string_pretty(&openapi) let json = serde_json::to_string_pretty(&openapi).expect("Failed to serialize OpenAPI spec");
.expect("Failed to serialize OpenAPI spec");
fs::write(&out_path, json).expect("Failed to write OpenAPI spec"); fs::write(&out_path, json).expect("Failed to write OpenAPI spec");
emit_stdout(&format!( emit_stdout(&format!(

View file

@ -47,7 +47,11 @@ fn init_logging() {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
tracing_subscriber::registry() tracing_subscriber::registry()
.with(filter) .with(filter)
.with(tracing_logfmt::builder().layer().with_writer(std::io::stderr)) .with(
tracing_logfmt::builder()
.layer()
.with_writer(std::io::stderr),
)
.init(); .init();
} }

View file

@ -3,8 +3,8 @@ use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use sandbox_agent_error::SandboxError; use sandbox_agent_error::SandboxError;
use time::{Duration, OffsetDateTime};
use sandbox_agent_universal_agent_schema::StderrOutput; use sandbox_agent_universal_agent_schema::StderrOutput;
use time::{Duration, OffsetDateTime};
const LOG_RETENTION_DAYS: i64 = 7; const LOG_RETENTION_DAYS: i64 = 7;
const LOG_HEAD_LINES: usize = 20; const LOG_HEAD_LINES: usize = 20;

View file

@ -1,7 +1,7 @@
//! Sandbox agent core utilities. //! Sandbox agent core utilities.
pub mod credentials;
mod agent_server_logs; mod agent_server_logs;
pub mod credentials;
pub mod router; pub mod router;
pub mod telemetry; pub mod telemetry;
pub mod ui; pub mod ui;

View file

@ -25,8 +25,7 @@ use sandbox_agent_universal_agent_schema::{
ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData,
PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason, PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason,
SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent, SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent,
UniversalEventData, UniversalEventData, UniversalEventType, UniversalItem,
UniversalEventType, UniversalItem,
}; };
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -37,6 +36,7 @@ use tokio_stream::wrappers::BroadcastStream;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use utoipa::{Modify, OpenApi, ToSchema}; use utoipa::{Modify, OpenApi, ToSchema};
use crate::agent_server_logs::AgentServerLogs;
use crate::ui; use crate::ui;
use sandbox_agent_agent_management::agents::{ use sandbox_agent_agent_management::agents::{
AgentError as ManagerError, AgentId, AgentManager, InstallOptions, SpawnOptions, StreamingSpawn, AgentError as ManagerError, AgentId, AgentManager, InstallOptions, SpawnOptions, StreamingSpawn,
@ -44,7 +44,6 @@ use sandbox_agent_agent_management::agents::{
use sandbox_agent_agent_management::credentials::{ use sandbox_agent_agent_management::credentials::{
extract_all_credentials, CredentialExtractionOptions, ExtractedCredentials, extract_all_credentials, CredentialExtractionOptions, ExtractedCredentials,
}; };
use crate::agent_server_logs::AgentServerLogs;
const MOCK_EVENT_DELAY_MS: u64 = 200; const MOCK_EVENT_DELAY_MS: u64 = 200;
static USER_MESSAGE_COUNTER: AtomicU64 = AtomicU64::new(1); static USER_MESSAGE_COUNTER: AtomicU64 = AtomicU64::new(1);
@ -99,7 +98,10 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
.route("/sessions", get(list_sessions)) .route("/sessions", get(list_sessions))
.route("/sessions/:session_id", post(create_session)) .route("/sessions/:session_id", post(create_session))
.route("/sessions/:session_id/messages", post(post_message)) .route("/sessions/:session_id/messages", post(post_message))
.route("/sessions/:session_id/messages/stream", post(post_message_stream)) .route(
"/sessions/:session_id/messages/stream",
post(post_message_stream),
)
.route("/sessions/:session_id/terminate", post(terminate_session)) .route("/sessions/:session_id/terminate", post(terminate_session))
.route("/sessions/:session_id/events", get(get_events)) .route("/sessions/:session_id/events", get(get_events))
.route("/sessions/:session_id/events/sse", get(get_events_sse)) .route("/sessions/:session_id/events/sse", get(get_events_sse))
@ -1142,8 +1144,8 @@ impl AgentServerManager {
) -> Result<(String, Arc<std::sync::Mutex<Option<std::process::Child>>>), SandboxError> { ) -> Result<(String, Arc<std::sync::Mutex<Option<std::process::Child>>>), SandboxError> {
let manager = self.agent_manager.clone(); let manager = self.agent_manager.clone();
let log_dir = self.log_base_dir.clone(); let log_dir = self.log_base_dir.clone();
let (base_url, child) = let (base_url, child) = tokio::task::spawn_blocking(
tokio::task::spawn_blocking(move || -> Result<(String, std::process::Child), SandboxError> { move || -> Result<(String, std::process::Child), SandboxError> {
let path = manager let path = manager
.resolve_binary(agent) .resolve_binary(agent)
.map_err(|err| map_spawn_error(agent, err))?; .map_err(|err| map_spawn_error(agent, err))?;
@ -1160,16 +1162,14 @@ impl AgentServerManager {
message: err.to_string(), message: err.to_string(),
})?; })?;
Ok((format!("http://127.0.0.1:{port}"), child)) Ok((format!("http://127.0.0.1:{port}"), child))
}) },
)
.await .await
.map_err(|err| SandboxError::StreamError { .map_err(|err| SandboxError::StreamError {
message: err.to_string(), message: err.to_string(),
})??; })??;
Ok(( Ok((base_url, Arc::new(std::sync::Mutex::new(Some(child)))))
base_url,
Arc::new(std::sync::Mutex::new(Some(child))),
))
} }
async fn spawn_stdio_server( async fn spawn_stdio_server(
@ -1188,7 +1188,8 @@ impl AgentServerManager {
let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::<String>(); let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::<String>();
let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::<String>(); let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::<String>();
let child = tokio::task::spawn_blocking(move || -> Result<std::process::Child, SandboxError> { let child =
tokio::task::spawn_blocking(move || -> Result<std::process::Child, SandboxError> {
let path = manager let path = manager
.resolve_binary(agent) .resolve_binary(agent)
.map_err(|err| map_spawn_error(agent, err))?; .map_err(|err| map_spawn_error(agent, err))?;
@ -1204,10 +1205,16 @@ impl AgentServerManager {
message: err.to_string(), message: err.to_string(),
})?; })?;
let stdin = child.stdin.take().ok_or_else(|| SandboxError::StreamError { let stdin = child
.stdin
.take()
.ok_or_else(|| SandboxError::StreamError {
message: "codex stdin unavailable".to_string(), message: "codex stdin unavailable".to_string(),
})?; })?;
let stdout = child.stdout.take().ok_or_else(|| SandboxError::StreamError { let stdout = child
.stdout
.take()
.ok_or_else(|| SandboxError::StreamError {
message: "codex stdout unavailable".to_string(), message: "codex stdout unavailable".to_string(),
})?; })?;
@ -1348,7 +1355,10 @@ impl AgentServerManager {
} }
} }
async fn ensure_server_for_restart(self: Arc<Self>, agent: AgentId) -> Result<(), SandboxError> { async fn ensure_server_for_restart(
self: Arc<Self>,
agent: AgentId,
) -> Result<(), SandboxError> {
sleep(Duration::from_millis(500)).await; sleep(Duration::from_millis(500)).await;
match agent { match agent {
AgentId::Opencode => { AgentId::Opencode => {
@ -1446,26 +1456,24 @@ impl SessionManager {
} }
} }
fn session_ref<'a>( fn session_ref<'a>(sessions: &'a [SessionState], session_id: &str) -> Option<&'a SessionState> {
sessions: &'a [SessionState], sessions
session_id: &str, .iter()
) -> Option<&'a SessionState> { .find(|session| session.session_id == session_id)
sessions.iter().find(|session| session.session_id == session_id)
} }
fn session_mut<'a>( fn session_mut<'a>(
sessions: &'a mut [SessionState], sessions: &'a mut [SessionState],
session_id: &str, session_id: &str,
) -> Option<&'a mut SessionState> { ) -> Option<&'a mut SessionState> {
sessions.iter_mut().find(|session| session.session_id == session_id) sessions
.iter_mut()
.find(|session| session.session_id == session_id)
} }
/// Read agent stderr for error diagnostics /// Read agent stderr for error diagnostics
fn read_agent_stderr(&self, agent: AgentId) -> Option<StderrOutput> { fn read_agent_stderr(&self, agent: AgentId) -> Option<StderrOutput> {
let logs = AgentServerLogs::new( let logs = AgentServerLogs::new(self.server_manager.log_base_dir.clone(), agent.as_str());
self.server_manager.log_base_dir.clone(),
agent.as_str(),
);
logs.read_stderr() logs.read_stderr()
} }
@ -1477,7 +1485,10 @@ impl SessionManager {
let agent_id = parse_agent_id(&request.agent)?; let agent_id = parse_agent_id(&request.agent)?;
{ {
let sessions = self.sessions.lock().await; let sessions = self.sessions.lock().await;
if sessions.iter().any(|session| session.session_id == session_id) { if sessions
.iter()
.any(|session| session.session_id == session_id)
{
return Err(SandboxError::SessionAlreadyExists { session_id }); return Err(SandboxError::SessionAlreadyExists { session_id });
} }
} }
@ -1676,10 +1687,7 @@ impl SessionManager {
Ok(()) Ok(())
} }
async fn emit_synthetic_assistant_start( async fn emit_synthetic_assistant_start(&self, session_id: &str) -> Result<(), SandboxError> {
&self,
session_id: &str,
) -> Result<(), SandboxError> {
let conversion = { let conversion = {
let mut sessions = self.sessions.lock().await; let mut sessions = self.sessions.lock().await;
let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| { let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| {
@ -1689,7 +1697,9 @@ impl SessionManager {
})?; })?;
session.enqueue_pending_assistant_start() session.enqueue_pending_assistant_start()
}; };
let _ = self.record_conversions(session_id, vec![conversion]).await?; let _ = self
.record_conversions(session_id, vec![conversion])
.await?;
Ok(()) Ok(())
} }
@ -1906,10 +1916,14 @@ impl SessionManager {
let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest { let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(), message: "Claude session is not active".to_string(),
})?; })?;
let session_id = native_session_id.clone().unwrap_or_else(|| session_id.to_string()); let session_id = native_session_id
.clone()
.unwrap_or_else(|| session_id.to_string());
let response_text = response.clone().unwrap_or_default(); let response_text = response.clone().unwrap_or_default();
let line = claude_tool_result_line(&session_id, question_id, &response_text, false); let line = claude_tool_result_line(&session_id, question_id, &response_text, false);
sender.send(line).map_err(|_| SandboxError::InvalidRequest { sender
.send(line)
.map_err(|_| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(), message: "Claude session is not active".to_string(),
})?; })?;
} else { } else {
@ -1977,14 +1991,18 @@ impl SessionManager {
let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest { let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(), message: "Claude session is not active".to_string(),
})?; })?;
let session_id = native_session_id.clone().unwrap_or_else(|| session_id.to_string()); let session_id = native_session_id
.clone()
.unwrap_or_else(|| session_id.to_string());
let line = claude_tool_result_line( let line = claude_tool_result_line(
&session_id, &session_id,
question_id, question_id,
"User rejected the question.", "User rejected the question.",
true, true,
); );
sender.send(line).map_err(|_| SandboxError::InvalidRequest { sender
.send(line)
.map_err(|_| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(), message: "Claude session is not active".to_string(),
})?; })?;
} else { } else {
@ -2140,7 +2158,9 @@ impl SessionManager {
}; };
let line = claude_control_response_line(permission_id, behavior, response_value); let line = claude_control_response_line(permission_id, behavior, response_value);
sender.send(line).map_err(|_| SandboxError::InvalidRequest { sender
.send(line)
.map_err(|_| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(), message: "Claude session is not active".to_string(),
})?; })?;
} else { } else {
@ -2812,7 +2832,8 @@ impl SessionManager {
Err(_) => continue, Err(_) => continue,
}; };
let message: codex_schema::JsonrpcMessage = match serde_json::from_value(value.clone()) { let message: codex_schema::JsonrpcMessage = match serde_json::from_value(value.clone())
{
Ok(m) => m, Ok(m) => m,
Err(_) => continue, Err(_) => continue,
}; };
@ -2829,11 +2850,16 @@ impl SessionManager {
if let Ok(notification) = if let Ok(notification) =
serde_json::from_value::<codex_schema::ServerNotification>(value.clone()) serde_json::from_value::<codex_schema::ServerNotification>(value.clone())
{ {
if let Some(thread_id) = codex_thread_id_from_server_notification(&notification) { if let Some(thread_id) =
codex_thread_id_from_server_notification(&notification)
{
if let Some(session_id) = server.session_for_thread(&thread_id) { if let Some(session_id) = server.session_for_thread(&thread_id) {
let conversions = match convert_codex::notification_to_universal(&notification) { let conversions =
match convert_codex::notification_to_universal(&notification) {
Ok(c) => c, Ok(c) => c,
Err(err) => vec![agent_unparsed("codex", &err, value.clone())], Err(err) => {
vec![agent_unparsed("codex", &err, value.clone())]
}
}; };
let _ = self.record_conversions(&session_id, conversions).await; let _ = self.record_conversions(&session_id, conversions).await;
} }
@ -2852,7 +2878,8 @@ impl SessionManager {
for conversion in &mut conversions { for conversion in &mut conversions {
conversion.raw = Some(value.clone()); conversion.raw = Some(value.clone());
} }
let _ = self.record_conversions(&session_id, conversions).await; let _ =
self.record_conversions(&session_id, conversions).await;
} }
Err(err) => { Err(err) => {
let _ = self let _ = self
@ -2983,7 +3010,8 @@ impl SessionManager {
) -> Result<(), SandboxError> { ) -> Result<(), SandboxError> {
let server = self.ensure_codex_server().await?; let server = self.ensure_codex_server().await?;
let thread_id = session let thread_id =
session
.native_session_id .native_session_id
.as_ref() .as_ref()
.ok_or_else(|| SandboxError::InvalidRequest { .ok_or_else(|| SandboxError::InvalidRequest {
@ -3431,14 +3459,22 @@ pub struct EventsQuery {
pub offset: Option<u64>, pub offset: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub limit: Option<u64>, pub limit: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none", alias = "include_raw")] #[serde(
default,
skip_serializing_if = "Option::is_none",
alias = "include_raw"
)]
pub include_raw: Option<bool>, pub include_raw: Option<bool>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] #[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct TurnStreamQuery { pub struct TurnStreamQuery {
#[serde(default, skip_serializing_if = "Option::is_none", alias = "include_raw")] #[serde(
default,
skip_serializing_if = "Option::is_none",
alias = "include_raw"
)]
pub include_raw: Option<bool>, pub include_raw: Option<bool>,
} }
@ -3554,7 +3590,10 @@ async fn get_root() -> &'static str {
} }
async fn not_found() -> (StatusCode, String) { async fn not_found() -> (StatusCode, String) {
(StatusCode::NOT_FOUND, format!("404 Not Found\n\n{SERVER_INFO}")) (
StatusCode::NOT_FOUND,
format!("404 Not Found\n\n{SERVER_INFO}"),
)
} }
#[utoipa::path( #[utoipa::path(
@ -3581,7 +3620,8 @@ async fn list_agents(
let manager = state.agent_manager.clone(); let manager = state.agent_manager.clone();
let server_statuses = state.session_manager.server_manager.status_snapshot().await; let server_statuses = state.session_manager.server_manager.status_snapshot().await;
let agents = tokio::task::spawn_blocking(move || { let agents =
tokio::task::spawn_blocking(move || {
all_agents() all_agents()
.into_iter() .into_iter()
.map(|agent_id| { .map(|agent_id| {
@ -3591,19 +3631,17 @@ async fn list_agents(
let capabilities = agent_capabilities_for(agent_id); let capabilities = agent_capabilities_for(agent_id);
// Add server_status for agents with shared processes // Add server_status for agents with shared processes
let server_status = if capabilities.shared_process { let server_status =
Some( if capabilities.shared_process {
server_statuses Some(server_statuses.get(&agent_id).cloned().unwrap_or(
.get(&agent_id) ServerStatusInfo {
.cloned()
.unwrap_or(ServerStatusInfo {
status: ServerStatus::Stopped, status: ServerStatus::Stopped,
base_url: None, base_url: None,
uptime_ms: None, uptime_ms: None,
restart_count: 0, restart_count: 0,
last_error: None, last_error: None,
}), },
) ))
} else { } else {
None None
}; };
@ -3908,7 +3946,10 @@ fn all_agents() -> [AgentId; 5] {
/// Returns true if the agent supports resuming a session after its process exits. /// Returns true if the agent supports resuming a session after its process exits.
/// These agents can use --resume/--continue to continue a conversation. /// These agents can use --resume/--continue to continue a conversation.
fn agent_supports_resume(agent: AgentId) -> bool { fn agent_supports_resume(agent: AgentId) -> bool {
matches!(agent, AgentId::Claude | AgentId::Amp | AgentId::Opencode | AgentId::Codex) matches!(
agent,
AgentId::Claude | AgentId::Amp | AgentId::Opencode | AgentId::Codex
)
} }
fn agent_supports_item_started(agent: AgentId) -> bool { fn agent_supports_item_started(agent: AgentId) -> bool {
@ -4076,7 +4117,8 @@ fn agent_modes_for(agent: AgentId) -> Vec<AgentModeInfo> {
name: "Build".to_string(), name: "Build".to_string(),
description: "Default build mode".to_string(), description: "Default build mode".to_string(),
}], }],
AgentId::Mock => vec![AgentModeInfo { AgentId::Mock => vec![
AgentModeInfo {
id: "build".to_string(), id: "build".to_string(),
name: "Build".to_string(), name: "Build".to_string(),
description: "Mock agent for UI testing".to_string(), description: "Mock agent for UI testing".to_string(),
@ -4085,7 +4127,8 @@ fn agent_modes_for(agent: AgentId) -> Vec<AgentModeInfo> {
id: "plan".to_string(), id: "plan".to_string(),
name: "Plan".to_string(), name: "Plan".to_string(),
description: "Plan-only mock mode".to_string(), description: "Plan-only mock mode".to_string(),
}], },
],
} }
} }
@ -4317,16 +4360,9 @@ fn claude_tool_result_line(
.to_string() .to_string()
} }
fn claude_control_response_line( fn claude_control_response_line(request_id: &str, behavior: &str, response: Value) -> String {
request_id: &str,
behavior: &str,
response: Value,
) -> String {
let mut response_obj = serde_json::Map::new(); let mut response_obj = serde_json::Map::new();
response_obj.insert( response_obj.insert("behavior".to_string(), Value::String(behavior.to_string()));
"behavior".to_string(),
Value::String(behavior.to_string()),
);
if let Some(message) = response.get("message") { if let Some(message) = response.get("message") {
response_obj.insert("message".to_string(), message.clone()); response_obj.insert("message".to_string(), message.clone());
} }
@ -5032,7 +5068,8 @@ pub mod test_utils {
impl TestHarness { impl TestHarness {
pub async fn new() -> Self { pub async fn new() -> Self {
let temp_dir = TempDir::new().expect("temp dir"); let temp_dir = TempDir::new().expect("temp dir");
let agent_manager = Arc::new(AgentManager::new(temp_dir.path()).expect("agent manager")); let agent_manager =
Arc::new(AgentManager::new(temp_dir.path()).expect("agent manager"));
let session_manager = Arc::new(SessionManager::new(agent_manager)); let session_manager = Arc::new(SessionManager::new(agent_manager));
session_manager session_manager
.server_manager .server_manager
@ -5068,11 +5105,7 @@ pub mod test_utils {
.await; .await;
} }
pub async fn has_session_mapping( pub async fn has_session_mapping(&self, agent: AgentId, session_id: &str) -> bool {
&self,
agent: AgentId,
session_id: &str,
) -> bool {
let sessions = self.session_manager.server_manager.sessions.lock().await; let sessions = self.session_manager.server_manager.sessions.lock().await;
sessions sessions
.get(&agent) .get(&agent)
@ -5111,8 +5144,8 @@ pub mod test_utils {
variant: None, variant: None,
agent_version: None, agent_version: None,
}; };
let mut session = SessionState::new(session_id.to_string(), agent, &request) let mut session =
.expect("session"); SessionState::new(session_id.to_string(), agent, &request).expect("session");
session.native_session_id = native_session_id.map(|id| id.to_string()); session.native_session_id = native_session_id.map(|id| id.to_string());
self.session_manager.sessions.lock().await.push(session); self.session_manager.sessions.lock().await.push(session);
} }
@ -5126,7 +5159,12 @@ pub mod test_utils {
let (stdin_tx, _stdin_rx) = mpsc::unbounded_channel::<String>(); let (stdin_tx, _stdin_rx) = mpsc::unbounded_channel::<String>();
let server = Arc::new(CodexServer::new(stdin_tx)); let server = Arc::new(CodexServer::new(stdin_tx));
let child = Arc::new(std::sync::Mutex::new(child)); let child = Arc::new(std::sync::Mutex::new(child));
self.session_manager.server_manager.servers.lock().await.insert( self.session_manager
.server_manager
.servers
.lock()
.await
.insert(
agent, agent,
ManagedServer { ManagedServer {
kind: ManagedServerKind::Stdio { server }, kind: ManagedServerKind::Stdio { server },
@ -5143,7 +5181,12 @@ pub mod test_utils {
} }
pub async fn insert_http_server(&self, agent: AgentId, instance_id: u64) { pub async fn insert_http_server(&self, agent: AgentId, instance_id: u64) {
self.session_manager.server_manager.servers.lock().await.insert( self.session_manager
.server_manager
.servers
.lock()
.await
.insert(
agent, agent,
ManagedServer { ManagedServer {
kind: ManagedServerKind::Http { kind: ManagedServerKind::Http {
@ -5246,7 +5289,12 @@ pub mod test_utils {
fn default_log_dir() -> PathBuf { fn default_log_dir() -> PathBuf {
dirs::data_dir() dirs::data_dir()
.map(|dir| dir.join("sandbox-agent").join("logs").join("servers")) .map(|dir| dir.join("sandbox-agent").join("logs").join("servers"))
.unwrap_or_else(|| PathBuf::from(".").join(".sandbox-agent").join("logs").join("servers")) .unwrap_or_else(|| {
PathBuf::from(".")
.join(".sandbox-agent")
.join("logs")
.join("servers")
})
} }
fn find_available_port() -> Result<u16, SandboxError> { fn find_available_port() -> Result<u16, SandboxError> {
@ -5297,7 +5345,6 @@ impl SseAccumulator {
} }
} }
fn parse_opencode_modes(value: &Value) -> Vec<AgentModeInfo> { fn parse_opencode_modes(value: &Value) -> Vec<AgentModeInfo> {
let mut modes = Vec::new(); let mut modes = Vec::new();
let mut seen = HashSet::new(); let mut seen = HashSet::new();

View file

@ -90,7 +90,8 @@ pub fn spawn_telemetry_task() {
attempt_send(&client).await; attempt_send(&client).await;
let start = Instant::now() + Duration::from_secs(TELEMETRY_INTERVAL_SECS); let start = Instant::now() + Duration::from_secs(TELEMETRY_INTERVAL_SECS);
let mut interval = tokio::time::interval_at(start, Duration::from_secs(TELEMETRY_INTERVAL_SECS)); let mut interval =
tokio::time::interval_at(start, Duration::from_secs(TELEMETRY_INTERVAL_SECS));
loop { loop {
interval.tick().await; interval.tick().await;
attempt_send(&client).await; attempt_send(&client).await;
@ -150,7 +151,12 @@ fn load_or_create_id() -> String {
} }
} }
if let Ok(mut file) = fs::OpenOptions::new().create(true).write(true).truncate(true).open(&path) { if let Ok(mut file) = fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&path)
{
let _ = file.write_all(id.as_bytes()); let _ = file.write_all(id.as_bytes());
} }
id id
@ -194,7 +200,12 @@ fn write_last_sent(timestamp: i64) {
return; return;
} }
} }
if let Ok(mut file) = fs::OpenOptions::new().create(true).write(true).truncate(true).open(&path) { if let Ok(mut file) = fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&path)
{
let _ = file.write_all(timestamp.to_string().as_bytes()); let _ = file.write_all(timestamp.to_string().as_bytes());
} }
} }
@ -266,7 +277,8 @@ fn detect_provider() -> ProviderInfo {
}; };
} }
if env::var("MODAL_IS_REMOTE").as_deref() == Ok("1") || env::var("MODAL_CLOUD_PROVIDER").is_ok() { if env::var("MODAL_IS_REMOTE").as_deref() == Ok("1") || env::var("MODAL_CLOUD_PROVIDER").is_ok()
{
let metadata = metadata_or_none([ let metadata = metadata_or_none([
("cloudProvider", env::var("MODAL_CLOUD_PROVIDER").ok()), ("cloudProvider", env::var("MODAL_CLOUD_PROVIDER").ok()),
("region", env::var("MODAL_REGION").ok()), ("region", env::var("MODAL_REGION").ok()),
@ -395,7 +407,9 @@ fn detect_docker() -> bool {
false false
} }
fn filter_metadata(pairs: impl IntoIterator<Item = (&'static str, Option<String>)>) -> HashMap<String, String> { fn filter_metadata(
pairs: impl IntoIterator<Item = (&'static str, Option<String>)>,
) -> HashMap<String, String> {
let mut map = HashMap::new(); let mut map = HashMap::new();
for (key, value) in pairs { for (key, value) in pairs {
if let Some(value) = value { if let Some(value) = value {
@ -407,7 +421,9 @@ fn filter_metadata(pairs: impl IntoIterator<Item = (&'static str, Option<String>
map map
} }
fn metadata_or_none(pairs: impl IntoIterator<Item = (&'static str, Option<String>)>) -> Option<HashMap<String, String>> { fn metadata_or_none(
pairs: impl IntoIterator<Item = (&'static str, Option<String>)>,
) -> Option<HashMap<String, String>> {
let map = filter_metadata(pairs); let map = filter_metadata(pairs);
if map.is_empty() { if map.is_empty() {
None None

View file

@ -37,7 +37,11 @@ fn serve_path(path: &str) -> Response {
}; };
let trimmed = path.trim_start_matches('/'); let trimmed = path.trim_start_matches('/');
let target = if trimmed.is_empty() { "index.html" } else { trimmed }; let target = if trimmed.is_empty() {
"index.html"
} else {
trimmed
};
if let Some(file) = dir.get_file(target) { if let Some(file) = dir.get_file(target) {
return file_response(file); return file_response(file);

View file

@ -1,13 +1,13 @@
#[path = "../common/mod.rs"] #[path = "../common/mod.rs"]
mod common; mod common;
use axum::http::Method;
use common::*; use common::*;
use sandbox_agent_agent_management::testing::test_agents_from_env;
use sandbox_agent_agent_management::agents::AgentId; use sandbox_agent_agent_management::agents::AgentId;
use sandbox_agent_agent_management::testing::test_agents_from_env;
use serde_json::Value; use serde_json::Value;
use std::fs; use std::fs;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use axum::http::Method;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_file_edit_flow() { async fn agent_file_edit_flow() {
@ -77,9 +77,7 @@ Do not change any other files. Reply only with DONE after editing.",
let _ = send_status( let _ = send_status(
&app.app, &app.app,
Method::POST, Method::POST,
&format!( &format!("/v1/sessions/{session_id}/permissions/{permission_id}/reply"),
"/v1/sessions/{session_id}/permissions/{permission_id}/reply"
),
Some(serde_json::json!({ "reply": "once" })), Some(serde_json::json!({ "reply": "once" })),
) )
.await; .await;

View file

@ -1,11 +1,11 @@
#[path = "../common/mod.rs"] #[path = "../common/mod.rs"]
mod common; mod common;
use axum::http::Method;
use common::*; use common::*;
use sandbox_agent_agent_management::testing::test_agents_from_env; use sandbox_agent_agent_management::testing::test_agents_from_env;
use std::time::Duration;
use axum::http::Method;
use serde_json::json; use serde_json::json;
use std::time::Duration;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_permission_flow() { async fn agent_permission_flow() {
@ -41,11 +41,17 @@ async fn agent_permission_flow() {
Some(json!({ "reply": "once" })), Some(json!({ "reply": "once" })),
) )
.await; .await;
assert_eq!(status, axum::http::StatusCode::NO_CONTENT, "permission reply"); assert_eq!(
status,
axum::http::StatusCode::NO_CONTENT,
"permission reply"
);
let resolved = poll_events_until(&app.app, &session_id, Duration::from_secs(120), |events| { let resolved =
poll_events_until(&app.app, &session_id, Duration::from_secs(120), |events| {
events.iter().any(|event| { events.iter().any(|event| {
event.get("type").and_then(serde_json::Value::as_str) == Some("permission.resolved") event.get("type").and_then(serde_json::Value::as_str)
== Some("permission.resolved")
}) })
}) })
.await; .await;

View file

@ -1,11 +1,11 @@
#[path = "../common/mod.rs"] #[path = "../common/mod.rs"]
mod common; mod common;
use axum::http::Method;
use common::*; use common::*;
use sandbox_agent_agent_management::testing::test_agents_from_env; use sandbox_agent_agent_management::testing::test_agents_from_env;
use std::time::Duration;
use axum::http::Method;
use serde_json::json; use serde_json::json;
use std::time::Duration;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_question_flow() { async fn agent_question_flow() {
@ -44,9 +44,11 @@ async fn agent_question_flow() {
.await; .await;
assert_eq!(status, axum::http::StatusCode::NO_CONTENT, "question reply"); assert_eq!(status, axum::http::StatusCode::NO_CONTENT, "question reply");
let resolved = poll_events_until(&app.app, &session_id, Duration::from_secs(120), |events| { let resolved =
poll_events_until(&app.app, &session_id, Duration::from_secs(120), |events| {
events.iter().any(|event| { events.iter().any(|event| {
event.get("type").and_then(serde_json::Value::as_str) == Some("question.resolved") event.get("type").and_then(serde_json::Value::as_str)
== Some("question.resolved")
}) })
}) })
.await; .await;

View file

@ -1,11 +1,11 @@
#[path = "../common/mod.rs"] #[path = "../common/mod.rs"]
mod common; mod common;
use axum::http::Method;
use common::*; use common::*;
use sandbox_agent_agent_management::testing::test_agents_from_env; use sandbox_agent_agent_management::testing::test_agents_from_env;
use std::time::Duration;
use axum::http::Method;
use serde_json::json; use serde_json::json;
use std::time::Duration;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_termination() { async fn agent_termination() {
@ -26,13 +26,20 @@ async fn agent_termination() {
None, None,
) )
.await; .await;
assert_eq!(status, axum::http::StatusCode::NO_CONTENT, "terminate session"); assert_eq!(
status,
axum::http::StatusCode::NO_CONTENT,
"terminate session"
);
let events = poll_events_until(&app.app, &session_id, Duration::from_secs(30), |events| { let events = poll_events_until(&app.app, &session_id, Duration::from_secs(30), |events| {
has_event_type(events, "session.ended") has_event_type(events, "session.ended")
}) })
.await; .await;
assert!(has_event_type(&events, "session.ended"), "missing session.ended"); assert!(
has_event_type(&events, "session.ended"),
"missing session.ended"
);
let status = send_status( let status = send_status(
&app.app, &app.app,
@ -41,6 +48,9 @@ async fn agent_termination() {
Some(json!({ "message": PROMPT })), Some(json!({ "message": PROMPT })),
) )
.await; .await;
assert!(!status.is_success(), "terminated session should reject messages"); assert!(
!status.is_success(),
"terminated session should reject messages"
);
} }
} }

View file

@ -1,11 +1,11 @@
#[path = "../common/mod.rs"] #[path = "../common/mod.rs"]
mod common; mod common;
use axum::http::Method;
use common::*; use common::*;
use sandbox_agent_agent_management::testing::test_agents_from_env; use sandbox_agent_agent_management::testing::test_agents_from_env;
use serde_json::Value; use serde_json::Value;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use axum::http::Method;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_tool_flow() { async fn agent_tool_flow() {
@ -61,9 +61,7 @@ async fn agent_tool_flow() {
let _ = send_status( let _ = send_status(
&app.app, &app.app,
Method::POST, Method::POST,
&format!( &format!("/v1/sessions/{session_id}/permissions/{permission_id}/reply"),
"/v1/sessions/{session_id}/permissions/{permission_id}/reply"
),
Some(serde_json::json!({ "reply": "once" })), Some(serde_json::json!({ "reply": "once" })),
) )
.await; .await;

View file

@ -36,11 +36,19 @@ fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>>
let env = build_env(); let env = build_env();
assert!(!env.is_empty(), "expected credentials to be available"); assert!(!env.is_empty(), "expected credentials to be available");
let agents = [AgentId::Claude, AgentId::Codex, AgentId::Opencode, AgentId::Amp]; let agents = [
AgentId::Claude,
AgentId::Codex,
AgentId::Opencode,
AgentId::Amp,
];
for agent in agents { for agent in agents {
let install = manager.install(agent, InstallOptions::default())?; let install = manager.install(agent, InstallOptions::default())?;
assert!(install.path.exists(), "expected install for {agent}"); assert!(install.path.exists(), "expected install for {agent}");
assert!(manager.is_installed(agent), "expected is_installed for {agent}"); assert!(
manager.is_installed(agent),
"expected is_installed for {agent}"
);
manager.install( manager.install(
agent, agent,
InstallOptions { InstallOptions {
@ -70,9 +78,15 @@ fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>>
); );
let combined = format!("{}{}", result.stdout, result.stderr); let combined = format!("{}{}", result.stdout, result.stderr);
let output = result.result.clone().unwrap_or(combined); let output = result.result.clone().unwrap_or(combined);
assert!(output.contains("OK"), "expected OK for {agent}, got: {output}"); assert!(
output.contains("OK"),
"expected OK for {agent}, got: {output}"
);
if agent == AgentId::Claude || agent == AgentId::Opencode || (agent == AgentId::Amp && amp_configured()) { if agent == AgentId::Claude
|| agent == AgentId::Opencode
|| (agent == AgentId::Amp && amp_configured())
{
let mut resume = SpawnOptions::new(prompt_ok("OK2")); let mut resume = SpawnOptions::new(prompt_ok("OK2"));
resume.env = env.clone(); resume.env = env.clone();
resume.session_id = result.session_id.clone(); resume.session_id = result.session_id.clone();
@ -84,12 +98,17 @@ fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>>
); );
let combined = format!("{}{}", resumed.stdout, resumed.stderr); let combined = format!("{}{}", resumed.stdout, resumed.stderr);
let output = resumed.result.clone().unwrap_or(combined); let output = resumed.result.clone().unwrap_or(combined);
assert!(output.contains("OK2"), "expected OK2 for {agent}, got: {output}"); assert!(
output.contains("OK2"),
"expected OK2 for {agent}, got: {output}"
);
} else if agent == AgentId::Codex { } else if agent == AgentId::Codex {
let mut resume = SpawnOptions::new(prompt_ok("OK2")); let mut resume = SpawnOptions::new(prompt_ok("OK2"));
resume.env = env.clone(); resume.env = env.clone();
resume.session_id = result.session_id.clone(); resume.session_id = result.session_id.clone();
let err = manager.spawn(agent, resume).expect_err("expected resume error for codex"); let err = manager
.spawn(agent, resume)
.expect_err("expected resume error for codex");
assert!(matches!(err, AgentError::ResumeUnsupported { .. })); assert!(matches!(err, AgentError::ResumeUnsupported { .. }));
} }
@ -105,7 +124,10 @@ fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>>
); );
let combined = format!("{}{}", planned.stdout, planned.stderr); let combined = format!("{}{}", planned.stdout, planned.stderr);
let output = planned.result.clone().unwrap_or(combined); let output = planned.result.clone().unwrap_or(combined);
assert!(output.contains("OK3"), "expected OK3 for {agent}, got: {output}"); assert!(
output.contains("OK3"),
"expected OK3 for {agent}, got: {output}"
);
} }
} }
} }

View file

@ -9,12 +9,7 @@ use serde_json::{json, Value};
use tempfile::TempDir; use tempfile::TempDir;
use tower::util::ServiceExt; use tower::util::ServiceExt;
use sandbox_agent::router::{ use sandbox_agent::router::{build_router, AgentCapabilities, AgentListResponse, AuthConfig};
build_router,
AgentCapabilities,
AgentListResponse,
AuthConfig,
};
use sandbox_agent_agent_credentials::ExtractedCredentials; use sandbox_agent_agent_credentials::ExtractedCredentials;
use sandbox_agent_agent_management::agents::{AgentId, AgentManager}; use sandbox_agent_agent_management::agents::{AgentId, AgentManager};
@ -32,8 +27,7 @@ pub struct TestApp {
impl TestApp { impl TestApp {
pub fn new() -> Self { pub fn new() -> Self {
let install_dir = tempfile::tempdir().expect("create temp install dir"); let install_dir = tempfile::tempdir().expect("create temp install dir");
let manager = AgentManager::new(install_dir.path()) let manager = AgentManager::new(install_dir.path()).expect("create agent manager");
.expect("create agent manager");
let state = sandbox_agent::router::AppState::new(AuthConfig::disabled(), manager); let state = sandbox_agent::router::AppState::new(AuthConfig::disabled(), manager);
let app = build_router(state); let app = build_router(state);
Self { Self {
@ -59,7 +53,12 @@ impl Drop for EnvGuard {
} }
pub fn apply_credentials(creds: &ExtractedCredentials) -> EnvGuard { pub fn apply_credentials(creds: &ExtractedCredentials) -> EnvGuard {
let keys = ["ANTHROPIC_API_KEY", "CLAUDE_API_KEY", "OPENAI_API_KEY", "CODEX_API_KEY"]; let keys = [
"ANTHROPIC_API_KEY",
"CLAUDE_API_KEY",
"OPENAI_API_KEY",
"CODEX_API_KEY",
];
let mut saved = HashMap::new(); let mut saved = HashMap::new();
for key in keys { for key in keys {
saved.insert(key.to_string(), std::env::var(key).ok()); saved.insert(key.to_string(), std::env::var(key).ok());
@ -100,13 +99,11 @@ pub async fn send_json(
.method(method) .method(method)
.uri(path) .uri(path)
.header("content-type", "application/json") .header("content-type", "application/json")
.body(Body::from(body.map(|value| value.to_string()).unwrap_or_default())) .body(Body::from(
body.map(|value| value.to_string()).unwrap_or_default(),
))
.expect("request"); .expect("request");
let response = app let response = app.clone().oneshot(request).await.expect("response");
.clone()
.oneshot(request)
.await
.expect("response");
let status = response.status(); let status = response.status();
let bytes = response let bytes = response
.into_body() .into_body()
@ -140,15 +137,15 @@ pub async fn install_agent(app: &Router, agent: AgentId) {
Some(json!({})), Some(json!({})),
) )
.await; .await;
assert_eq!(status, StatusCode::NO_CONTENT, "install agent {}", agent.as_str()); assert_eq!(
status,
StatusCode::NO_CONTENT,
"install agent {}",
agent.as_str()
);
} }
pub async fn create_session( pub async fn create_session(app: &Router, agent: AgentId, session_id: &str, permission_mode: &str) {
app: &Router,
agent: AgentId,
session_id: &str,
permission_mode: &str,
) {
let status = send_status( let status = send_status(
app, app,
Method::POST, Method::POST,

View file

@ -28,11 +28,7 @@ async fn register_and_unregister_sessions() {
.register_session(AgentId::Codex, "sess-1", Some("thread-1")) .register_session(AgentId::Codex, "sess-1", Some("thread-1"))
.await; .await;
assert!( assert!(harness.has_session_mapping(AgentId::Codex, "sess-1").await);
harness
.has_session_mapping(AgentId::Codex, "sess-1")
.await
);
assert_eq!( assert_eq!(
harness harness
.native_mapping(AgentId::Codex, "thread-1") .native_mapping(AgentId::Codex, "thread-1")
@ -45,17 +41,11 @@ async fn register_and_unregister_sessions() {
.unregister_session(AgentId::Codex, "sess-1", Some("thread-1")) .unregister_session(AgentId::Codex, "sess-1", Some("thread-1"))
.await; .await;
assert!( assert!(!harness.has_session_mapping(AgentId::Codex, "sess-1").await);
!harness assert!(harness
.has_session_mapping(AgentId::Codex, "sess-1")
.await
);
assert!(
harness
.native_mapping(AgentId::Codex, "thread-1") .native_mapping(AgentId::Codex, "thread-1")
.await .await
.is_none() .is_none());
);
} }
#[tokio::test] #[tokio::test]
@ -92,9 +82,7 @@ async fn handle_process_exit_marks_error_and_ends_sessions() {
harness harness
.register_session(AgentId::Codex, "sess-1", Some("thread-1")) .register_session(AgentId::Codex, "sess-1", Some("thread-1"))
.await; .await;
harness harness.insert_stdio_server(AgentId::Codex, None, 1).await;
.insert_stdio_server(AgentId::Codex, None, 1)
.await;
harness harness
.handle_process_exit(AgentId::Codex, 1, exit_status(7)) .handle_process_exit(AgentId::Codex, 1, exit_status(7))
@ -104,13 +92,11 @@ async fn handle_process_exit_marks_error_and_ends_sessions() {
harness.server_status(AgentId::Codex).await, harness.server_status(AgentId::Codex).await,
Some(sandbox_agent::router::ServerStatus::Error) Some(sandbox_agent::router::ServerStatus::Error)
)); ));
assert!( assert!(harness
harness
.server_last_error(AgentId::Codex) .server_last_error(AgentId::Codex)
.await .await
.unwrap_or_default() .unwrap_or_default()
.contains("exited") .contains("exited"));
);
assert!(harness.session_ended("sess-1").await); assert!(harness.session_ended("sess-1").await);
assert!(matches!( assert!(matches!(
harness.session_end_reason("sess-1").await, harness.session_end_reason("sess-1").await,

View file

@ -1,6 +1,6 @@
mod session_lifecycle;
mod multi_turn; mod multi_turn;
mod permissions; mod permissions;
mod questions; mod questions;
mod reasoning; mod reasoning;
mod session_lifecycle;
mod status; mod status;

View file

@ -81,7 +81,12 @@ async fn multi_turn_snapshots() {
install_agent(&app.app, config.agent).await; install_agent(&app.app, config.agent).await;
let session_id = format!("multi-turn-{}", config.agent.as_str()); let session_id = format!("multi-turn-{}", config.agent.as_str());
create_session(&app.app, config.agent, &session_id, test_permission_mode(config.agent)) create_session(
&app.app,
config.agent,
&session_id,
test_permission_mode(config.agent),
)
.await; .await;
send_message_with_text(&app.app, &session_id, FIRST_PROMPT).await; send_message_with_text(&app.app, &session_id, FIRST_PROMPT).await;
@ -100,13 +105,8 @@ async fn multi_turn_snapshots() {
); );
send_message_with_text(&app.app, &session_id, SECOND_PROMPT).await; send_message_with_text(&app.app, &session_id, SECOND_PROMPT).await;
let (second_events, _offset) = poll_events_until_from( let (second_events, _offset) =
&app.app, poll_events_until_from(&app.app, &session_id, offset, Duration::from_secs(120)).await;
&session_id,
offset,
Duration::from_secs(120),
)
.await;
let second_events = truncate_after_first_stop(&second_events); let second_events = truncate_after_first_stop(&second_events);
assert!( assert!(
!second_events.is_empty(), !second_events.is_empty(),

View file

@ -55,9 +55,7 @@ async fn permission_flow_snapshots() {
let status = send_status( let status = send_status(
&app.app, &app.app,
Method::POST, Method::POST,
&format!( &format!("/v1/sessions/{permission_session}/permissions/{permission_id}/reply"),
"/v1/sessions/{permission_session}/permissions/{permission_id}/reply"
),
Some(json!({ "reply": "once" })), Some(json!({ "reply": "once" })),
) )
.await; .await;
@ -67,9 +65,7 @@ async fn permission_flow_snapshots() {
let (status, payload) = send_json( let (status, payload) = send_json(
&app.app, &app.app,
Method::POST, Method::POST,
&format!( &format!("/v1/sessions/{permission_session}/permissions/missing-permission/reply"),
"/v1/sessions/{permission_session}/permissions/missing-permission/reply"
),
Some(json!({ "reply": "once" })), Some(json!({ "reply": "once" })),
) )
.await; .await;

View file

@ -55,9 +55,7 @@ async fn question_flow_snapshots() {
let status = send_status( let status = send_status(
&app.app, &app.app,
Method::POST, Method::POST,
&format!( &format!("/v1/sessions/{question_reply_session}/questions/{question_id}/reply"),
"/v1/sessions/{question_reply_session}/questions/{question_id}/reply"
),
Some(json!({ "answers": answers })), Some(json!({ "answers": answers })),
) )
.await; .await;
@ -67,9 +65,7 @@ async fn question_flow_snapshots() {
let (status, payload) = send_json( let (status, payload) = send_json(
&app.app, &app.app,
Method::POST, Method::POST,
&format!( &format!("/v1/sessions/{question_reply_session}/questions/missing-question/reply"),
"/v1/sessions/{question_reply_session}/questions/missing-question/reply"
),
Some(json!({ "answers": [] })), Some(json!({ "answers": [] })),
) )
.await; .await;
@ -92,7 +88,11 @@ async fn question_flow_snapshots() {
Some(json!({ "message": QUESTION_PROMPT })), Some(json!({ "message": QUESTION_PROMPT })),
) )
.await; .await;
assert_eq!(status, StatusCode::NO_CONTENT, "send question prompt reject"); assert_eq!(
status,
StatusCode::NO_CONTENT,
"send question prompt reject"
);
let reject_events = poll_events_until_match( let reject_events = poll_events_until_match(
&app.app, &app.app,
@ -108,9 +108,7 @@ async fn question_flow_snapshots() {
let status = send_status( let status = send_status(
&app.app, &app.app,
Method::POST, Method::POST,
&format!( &format!("/v1/sessions/{question_reject_session}/questions/{question_id}/reject"),
"/v1/sessions/{question_reject_session}/questions/{question_id}/reject"
),
None, None,
) )
.await; .await;
@ -126,7 +124,10 @@ async fn question_flow_snapshots() {
None, None,
) )
.await; .await;
assert!(!status.is_success(), "missing question id reject should error"); assert!(
!status.is_success(),
"missing question id reject should error"
);
assert_session_snapshot( assert_session_snapshot(
"question_reject_missing", "question_reject_missing",
json!({ json!({

View file

@ -23,7 +23,12 @@ async fn reasoning_events_present() {
install_agent(&app.app, config.agent).await; install_agent(&app.app, config.agent).await;
let session_id = format!("reasoning-{}", config.agent.as_str()); let session_id = format!("reasoning-{}", config.agent.as_str());
create_session(&app.app, config.agent, &session_id, test_permission_mode(config.agent)) create_session(
&app.app,
config.agent,
&session_id,
test_permission_mode(config.agent),
)
.await; .await;
let status = send_status( let status = send_status(
&app.app, &app.app,
@ -34,12 +39,10 @@ async fn reasoning_events_present() {
.await; .await;
assert_eq!(status, StatusCode::NO_CONTENT, "send reasoning prompt"); assert_eq!(status, StatusCode::NO_CONTENT, "send reasoning prompt");
let events = poll_events_until_match( let events =
&app.app, poll_events_until_match(&app.app, &session_id, Duration::from_secs(120), |events| {
&session_id, events_have_content_type(events, "reasoning") || events.iter().any(is_error_event)
Duration::from_secs(120), })
|events| events_have_content_type(events, "reasoning") || events.iter().any(is_error_event),
)
.await; .await;
assert!( assert!(
events_have_content_type(&events, "reasoning"), events_have_content_type(&events, "reasoning"),

View file

@ -28,7 +28,12 @@ async fn status_events_present() {
install_agent(&app.app, config.agent).await; install_agent(&app.app, config.agent).await;
let session_id = format!("status-{}", config.agent.as_str()); let session_id = format!("status-{}", config.agent.as_str());
create_session(&app.app, config.agent, &session_id, test_permission_mode(config.agent)) create_session(
&app.app,
config.agent,
&session_id,
test_permission_mode(config.agent),
)
.await; .await;
let status = send_status( let status = send_status(
&app.app, &app.app,
@ -39,12 +44,10 @@ async fn status_events_present() {
.await; .await;
assert_eq!(status, StatusCode::NO_CONTENT, "send status prompt"); assert_eq!(status, StatusCode::NO_CONTENT, "send status prompt");
let events = poll_events_until_match( let events =
&app.app, poll_events_until_match(&app.app, &session_id, Duration::from_secs(120), |events| {
&session_id, events_have_status(events) || events.iter().any(is_error_event)
Duration::from_secs(120), })
|events| events_have_status(events) || events.iter().any(is_error_event),
)
.await; .await;
assert!( assert!(
events_have_status(&events), events_have_status(&events),

View file

@ -1,9 +1,9 @@
use axum::body::Body; use axum::body::Body;
use axum::http::{Request, StatusCode}; use axum::http::{Request, StatusCode};
use http_body_util::BodyExt; use http_body_util::BodyExt;
use sandbox_agent_agent_management::agents::AgentManager;
use sandbox_agent::router::{build_router, AppState, AuthConfig}; use sandbox_agent::router::{build_router, AppState, AuthConfig};
use sandbox_agent::ui; use sandbox_agent::ui;
use sandbox_agent_agent_management::agents::AgentManager;
use tempfile::TempDir; use tempfile::TempDir;
use tower::util::ServiceExt; use tower::util::ServiceExt;
@ -22,10 +22,7 @@ async fn serves_inspector_ui() {
.uri("/ui") .uri("/ui")
.body(Body::empty()) .body(Body::empty())
.expect("build request"); .expect("build request");
let response = app let response = app.oneshot(request).await.expect("request handled");
.oneshot(request)
.await
.expect("request handled");
assert_eq!(response.status(), StatusCode::OK); assert_eq!(response.status(), StatusCode::OK);

View file

@ -4,20 +4,9 @@ use serde_json::Value;
use crate::amp as schema; use crate::amp as schema;
use crate::{ use crate::{
ContentPart, ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole,
ErrorData, ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy, UniversalEventData,
EventConversion, UniversalEventType, UniversalItem,
ItemDeltaData,
ItemEventData,
ItemKind,
ItemRole,
ItemStatus,
SessionEndedData,
SessionEndReason,
TerminatedBy,
UniversalEventData,
UniversalEventType,
UniversalItem,
}; };
static TEMP_ID: AtomicU64 = AtomicU64::new(1); static TEMP_ID: AtomicU64 = AtomicU64::new(1);
@ -27,7 +16,9 @@ fn next_temp_id(prefix: &str) -> String {
format!("{prefix}_{id}") format!("{prefix}_{id}")
} }
pub fn event_to_universal(event: &schema::StreamJsonMessage) -> Result<Vec<EventConversion>, String> { pub fn event_to_universal(
event: &schema::StreamJsonMessage,
) -> Result<Vec<EventConversion>, String> {
let mut events = Vec::new(); let mut events = Vec::new();
match event.type_ { match event.type_ {
schema::StreamJsonMessageType::Message => { schema::StreamJsonMessageType::Message => {
@ -49,12 +40,17 @@ pub fn event_to_universal(event: &schema::StreamJsonMessage) -> Result<Vec<Event
let arguments = match call.arguments { let arguments = match call.arguments {
schema::ToolCallArguments::Variant0(text) => text, schema::ToolCallArguments::Variant0(text) => text,
schema::ToolCallArguments::Variant1(map) => { schema::ToolCallArguments::Variant1(map) => {
serde_json::to_string(&Value::Object(map)).unwrap_or_else(|_| "{}".to_string()) serde_json::to_string(&Value::Object(map))
.unwrap_or_else(|_| "{}".to_string())
} }
}; };
(call.name, arguments, call.id) (call.name, arguments, call.id)
} else { } else {
("unknown".to_string(), "{}".to_string(), next_temp_id("tmp_amp_tool")) (
"unknown".to_string(),
"{}".to_string(),
next_temp_id("tmp_amp_tool"),
)
}; };
let item = UniversalItem { let item = UniversalItem {
item_id: next_temp_id("tmp_amp_tool_call"), item_id: next_temp_id("tmp_amp_tool_call"),
@ -83,16 +79,16 @@ pub fn event_to_universal(event: &schema::StreamJsonMessage) -> Result<Vec<Event
parent_id: None, parent_id: None,
kind: ItemKind::ToolResult, kind: ItemKind::ToolResult,
role: Some(ItemRole::Tool), role: Some(ItemRole::Tool),
content: vec![ContentPart::ToolResult { content: vec![ContentPart::ToolResult { call_id, output }],
call_id,
output,
}],
status: ItemStatus::Completed, status: ItemStatus::Completed,
}; };
events.extend(item_events(item)); events.extend(item_events(item));
} }
schema::StreamJsonMessageType::Error => { schema::StreamJsonMessageType::Error => {
let message = event.error.clone().unwrap_or_else(|| "amp error".to_string()); let message = event
.error
.clone()
.unwrap_or_else(|| "amp error".to_string());
events.push(EventConversion::new( events.push(EventConversion::new(
UniversalEventType::Error, UniversalEventType::Error,
UniversalEventData::Error(ErrorData { UniversalEventData::Error(ErrorData {

View file

@ -3,21 +3,9 @@ use std::sync::atomic::{AtomicU64, Ordering};
use serde_json::Value; use serde_json::Value;
use crate::{ use crate::{
ContentPart, ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus,
EventConversion, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, SessionStartedData,
ItemEventData, UniversalEventData, UniversalEventType, UniversalItem,
ItemDeltaData,
ItemKind,
ItemRole,
ItemStatus,
PermissionEventData,
PermissionStatus,
QuestionEventData,
QuestionStatus,
SessionStartedData,
UniversalEventData,
UniversalEventType,
UniversalItem,
}; };
static TEMP_ID: AtomicU64 = AtomicU64::new(1); static TEMP_ID: AtomicU64 = AtomicU64::new(1);
@ -56,7 +44,10 @@ fn system_event_to_universal(event: &Value) -> EventConversion {
let data = SessionStartedData { let data = SessionStartedData {
metadata: Some(event.clone()), metadata: Some(event.clone()),
}; };
EventConversion::new(UniversalEventType::SessionStarted, UniversalEventData::SessionStarted(data)) EventConversion::new(
UniversalEventType::SessionStarted,
UniversalEventData::SessionStarted(data),
)
.with_raw(Some(event.clone())) .with_raw(Some(event.clone()))
} }
@ -97,12 +88,15 @@ fn assistant_event_to_universal(event: &Value, session_id: &str) -> Vec<EventCon
); );
let is_question_tool = matches!( let is_question_tool = matches!(
name, name,
"AskUserQuestion" | "ask_user_question" | "askUserQuestion" "AskUserQuestion"
| "ask_user_question"
| "askUserQuestion"
| "ask-user-question" | "ask-user-question"
) || is_exit_plan_mode; ) || is_exit_plan_mode;
let has_question_payload = input.get("questions").is_some(); let has_question_payload = input.get("questions").is_some();
if is_question_tool || has_question_payload { if is_question_tool || has_question_payload {
if let Some(question) = question_from_claude_input(&input, call_id.clone()) { if let Some(question) = question_from_claude_input(&input, call_id.clone())
{
conversions.push( conversions.push(
EventConversion::new( EventConversion::new(
UniversalEventType::QuestionRequested, UniversalEventType::QuestionRequested,
@ -117,10 +111,7 @@ fn assistant_event_to_universal(event: &Value, session_id: &str) -> Vec<EventCon
UniversalEventData::Question(QuestionEventData { UniversalEventData::Question(QuestionEventData {
question_id: call_id.clone(), question_id: call_id.clone(),
prompt: "Approve plan execution?".to_string(), prompt: "Approve plan execution?".to_string(),
options: vec![ options: vec!["approve".to_string(), "reject".to_string()],
"approve".to_string(),
"reject".to_string(),
],
response: None, response: None,
status: QuestionStatus::Requested, status: QuestionStatus::Requested,
}), }),
@ -129,7 +120,8 @@ fn assistant_event_to_universal(event: &Value, session_id: &str) -> Vec<EventCon
); );
} }
} }
let arguments = serde_json::to_string(&input).unwrap_or_else(|_| "{}".to_string()); let arguments =
serde_json::to_string(&input).unwrap_or_else(|_| "{}".to_string());
let tool_item = UniversalItem { let tool_item = UniversalItem {
item_id: String::new(), item_id: String::new(),
native_item_id: Some(call_id.clone()), native_item_id: Some(call_id.clone()),
@ -369,13 +361,12 @@ fn control_request_to_universal(event: &Value) -> Result<Vec<EventConversion>, S
.get("request") .get("request")
.and_then(Value::as_object) .and_then(Value::as_object)
.ok_or_else(|| "missing request".to_string())?; .ok_or_else(|| "missing request".to_string())?;
let subtype = request let subtype = request.get("subtype").and_then(Value::as_str).unwrap_or("");
.get("subtype")
.and_then(Value::as_str)
.unwrap_or("");
if subtype != "can_use_tool" { if subtype != "can_use_tool" {
return Err(format!("unsupported Claude control_request subtype: {subtype}")); return Err(format!(
"unsupported Claude control_request subtype: {subtype}"
));
} }
let tool_name = request let tool_name = request
@ -387,10 +378,7 @@ fn control_request_to_universal(event: &Value) -> Result<Vec<EventConversion>, S
.get("permission_suggestions") .get("permission_suggestions")
.cloned() .cloned()
.unwrap_or(Value::Null); .unwrap_or(Value::Null);
let blocked_path = request let blocked_path = request.get("blocked_path").cloned().unwrap_or(Value::Null);
.get("blocked_path")
.cloned()
.unwrap_or(Value::Null);
let metadata = serde_json::json!({ let metadata = serde_json::json!({
"toolName": tool_name, "toolName": tool_name,

View file

@ -2,22 +2,9 @@ use serde_json::Value;
use crate::codex as schema; use crate::codex as schema;
use crate::{ use crate::{
ContentPart, ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole,
ErrorData, ItemStatus, ReasoningVisibility, SessionEndReason, SessionEndedData, SessionStartedData,
EventConversion, TerminatedBy, UniversalEventData, UniversalEventType, UniversalItem,
ItemDeltaData,
ItemEventData,
ItemKind,
ItemRole,
ItemStatus,
ReasoningVisibility,
SessionEndedData,
SessionEndReason,
SessionStartedData,
TerminatedBy,
UniversalEventData,
UniversalEventType,
UniversalItem,
}; };
/// Convert a Codex ServerNotification to universal events. /// Convert a Codex ServerNotification to universal events.
@ -30,14 +17,12 @@ pub fn notification_to_universal(
let data = SessionStartedData { let data = SessionStartedData {
metadata: serde_json::to_value(&params.thread).ok(), metadata: serde_json::to_value(&params.thread).ok(),
}; };
Ok(vec![ Ok(vec![EventConversion::new(
EventConversion::new(
UniversalEventType::SessionStarted, UniversalEventType::SessionStarted,
UniversalEventData::SessionStarted(data), UniversalEventData::SessionStarted(data),
) )
.with_native_session(Some(params.thread.id.clone())) .with_native_session(Some(params.thread.id.clone()))
.with_raw(raw), .with_raw(raw)])
])
} }
schema::ServerNotification::ThreadCompacted(params) => Ok(vec![status_event( schema::ServerNotification::ThreadCompacted(params) => Ok(vec![status_event(
"thread.compacted", "thread.compacted",
@ -77,28 +62,24 @@ pub fn notification_to_universal(
)]), )]),
schema::ServerNotification::ItemStarted(params) => { schema::ServerNotification::ItemStarted(params) => {
let item = thread_item_to_item(&params.item, ItemStatus::InProgress); let item = thread_item_to_item(&params.item, ItemStatus::InProgress);
Ok(vec![ Ok(vec![EventConversion::new(
EventConversion::new(
UniversalEventType::ItemStarted, UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }), UniversalEventData::Item(ItemEventData { item }),
) )
.with_native_session(Some(params.thread_id.clone())) .with_native_session(Some(params.thread_id.clone()))
.with_raw(raw), .with_raw(raw)])
])
} }
schema::ServerNotification::ItemCompleted(params) => { schema::ServerNotification::ItemCompleted(params) => {
let item = thread_item_to_item(&params.item, ItemStatus::Completed); let item = thread_item_to_item(&params.item, ItemStatus::Completed);
Ok(vec![ Ok(vec![EventConversion::new(
EventConversion::new(
UniversalEventType::ItemCompleted, UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }), UniversalEventData::Item(ItemEventData { item }),
) )
.with_native_session(Some(params.thread_id.clone())) .with_native_session(Some(params.thread_id.clone()))
.with_raw(raw), .with_raw(raw)])
])
} }
schema::ServerNotification::ItemAgentMessageDelta(params) => Ok(vec![ schema::ServerNotification::ItemAgentMessageDelta(params) => {
EventConversion::new( Ok(vec![EventConversion::new(
UniversalEventType::ItemDelta, UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData { UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(), item_id: String::new(),
@ -107,10 +88,10 @@ pub fn notification_to_universal(
}), }),
) )
.with_native_session(Some(params.thread_id.clone())) .with_native_session(Some(params.thread_id.clone()))
.with_raw(raw), .with_raw(raw)])
]), }
schema::ServerNotification::ItemReasoningTextDelta(params) => Ok(vec![ schema::ServerNotification::ItemReasoningTextDelta(params) => {
EventConversion::new( Ok(vec![EventConversion::new(
UniversalEventType::ItemDelta, UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData { UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(), item_id: String::new(),
@ -119,10 +100,10 @@ pub fn notification_to_universal(
}), }),
) )
.with_native_session(Some(params.thread_id.clone())) .with_native_session(Some(params.thread_id.clone()))
.with_raw(raw), .with_raw(raw)])
]), }
schema::ServerNotification::ItemReasoningSummaryTextDelta(params) => Ok(vec![ schema::ServerNotification::ItemReasoningSummaryTextDelta(params) => {
EventConversion::new( Ok(vec![EventConversion::new(
UniversalEventType::ItemDelta, UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData { UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(), item_id: String::new(),
@ -131,10 +112,10 @@ pub fn notification_to_universal(
}), }),
) )
.with_native_session(Some(params.thread_id.clone())) .with_native_session(Some(params.thread_id.clone()))
.with_raw(raw), .with_raw(raw)])
]), }
schema::ServerNotification::ItemCommandExecutionOutputDelta(params) => Ok(vec![ schema::ServerNotification::ItemCommandExecutionOutputDelta(params) => {
EventConversion::new( Ok(vec![EventConversion::new(
UniversalEventType::ItemDelta, UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData { UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(), item_id: String::new(),
@ -143,10 +124,10 @@ pub fn notification_to_universal(
}), }),
) )
.with_native_session(Some(params.thread_id.clone())) .with_native_session(Some(params.thread_id.clone()))
.with_raw(raw), .with_raw(raw)])
]), }
schema::ServerNotification::ItemFileChangeOutputDelta(params) => Ok(vec![ schema::ServerNotification::ItemFileChangeOutputDelta(params) => {
EventConversion::new( Ok(vec![EventConversion::new(
UniversalEventType::ItemDelta, UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData { UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(), item_id: String::new(),
@ -155,10 +136,10 @@ pub fn notification_to_universal(
}), }),
) )
.with_native_session(Some(params.thread_id.clone())) .with_native_session(Some(params.thread_id.clone()))
.with_raw(raw), .with_raw(raw)])
]), }
schema::ServerNotification::ItemCommandExecutionTerminalInteraction(params) => Ok(vec![ schema::ServerNotification::ItemCommandExecutionTerminalInteraction(params) => {
EventConversion::new( Ok(vec![EventConversion::new(
UniversalEventType::ItemDelta, UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData { UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(), item_id: String::new(),
@ -167,33 +148,34 @@ pub fn notification_to_universal(
}), }),
) )
.with_native_session(Some(params.thread_id.clone())) .with_native_session(Some(params.thread_id.clone()))
.with_raw(raw), .with_raw(raw)])
]), }
schema::ServerNotification::ItemMcpToolCallProgress(params) => Ok(vec![status_event( schema::ServerNotification::ItemMcpToolCallProgress(params) => Ok(vec![status_event(
"mcp.progress", "mcp.progress",
serde_json::to_string(params).ok(), serde_json::to_string(params).ok(),
Some(params.thread_id.clone()), Some(params.thread_id.clone()),
raw, raw,
)]), )]),
schema::ServerNotification::ItemReasoningSummaryPartAdded(params) => Ok(vec![ schema::ServerNotification::ItemReasoningSummaryPartAdded(params) => {
status_event( Ok(vec![status_event(
"reasoning.summary.part_added", "reasoning.summary.part_added",
serde_json::to_string(params).ok(), serde_json::to_string(params).ok(),
Some(params.thread_id.clone()), Some(params.thread_id.clone()),
raw, raw,
), )])
]), }
schema::ServerNotification::Error(params) => { schema::ServerNotification::Error(params) => {
let data = ErrorData { let data = ErrorData {
message: params.error.message.clone(), message: params.error.message.clone(),
code: None, code: None,
details: serde_json::to_value(params).ok(), details: serde_json::to_value(params).ok(),
}; };
Ok(vec![ Ok(vec![EventConversion::new(
EventConversion::new(UniversalEventType::Error, UniversalEventData::Error(data)) UniversalEventType::Error,
UniversalEventData::Error(data),
)
.with_native_session(Some(params.thread_id.clone())) .with_native_session(Some(params.thread_id.clone()))
.with_raw(raw), .with_raw(raw)])
])
} }
schema::ServerNotification::RawResponseItemCompleted(params) => Ok(vec![status_event( schema::ServerNotification::RawResponseItemCompleted(params) => Ok(vec![status_event(
"raw.item.completed", "raw.item.completed",
@ -239,7 +221,11 @@ fn thread_item_to_item(item: &schema::ThreadItem, status: ItemStatus) -> Univers
content: vec![ContentPart::Text { text: text.clone() }], content: vec![ContentPart::Text { text: text.clone() }],
status, status,
}, },
schema::ThreadItem::Reasoning { content, id, summary } => { schema::ThreadItem::Reasoning {
content,
id,
summary,
} => {
let mut parts = Vec::new(); let mut parts = Vec::new();
for line in content { for line in content {
parts.push(ContentPart::Reasoning { parts.push(ContentPart::Reasoning {
@ -295,7 +281,11 @@ fn thread_item_to_item(item: &schema::ThreadItem, status: ItemStatus) -> Univers
status, status,
} }
} }
schema::ThreadItem::FileChange { changes, id, status: file_status } => UniversalItem { schema::ThreadItem::FileChange {
changes,
id,
status: file_status,
} => UniversalItem {
item_id: String::new(), item_id: String::new(),
native_item_id: Some(id.clone()), native_item_id: Some(id.clone()),
parent_id: None, parent_id: None,
@ -339,7 +329,8 @@ fn thread_item_to_item(item: &schema::ThreadItem, status: ItemStatus) -> Univers
output, output,
}); });
} else { } else {
let arguments = serde_json::to_string(arguments).unwrap_or_else(|_| "{}".to_string()); let arguments =
serde_json::to_string(arguments).unwrap_or_else(|_| "{}".to_string());
parts.push(ContentPart::ToolCall { parts.push(ContentPart::ToolCall {
name: format!("{server}.{tool}"), name: format!("{server}.{tool}"),
arguments, arguments,
@ -433,18 +424,12 @@ fn thread_item_to_item(item: &schema::ThreadItem, status: ItemStatus) -> Univers
}], }],
status, status,
}, },
schema::ThreadItem::EnteredReviewMode { id, review } => status_item_internal( schema::ThreadItem::EnteredReviewMode { id, review } => {
id, status_item_internal(id, "review.entered", Some(review.clone()), status)
"review.entered", }
Some(review.clone()), schema::ThreadItem::ExitedReviewMode { id, review } => {
status, status_item_internal(id, "review.exited", Some(review.clone()), status)
), }
schema::ThreadItem::ExitedReviewMode { id, review } => status_item_internal(
id,
"review.exited",
Some(review.clone()),
status,
),
} }
} }
@ -463,7 +448,12 @@ fn status_item(label: &str, detail: Option<String>) -> UniversalItem {
} }
} }
fn status_item_internal(id: &str, label: &str, detail: Option<String>, status: ItemStatus) -> UniversalItem { fn status_item_internal(
id: &str,
label: &str,
detail: Option<String>,
status: ItemStatus,
) -> UniversalItem {
UniversalItem { UniversalItem {
item_id: String::new(), item_id: String::new(),
native_item_id: Some(id.to_string()), native_item_id: Some(id.to_string()),

View file

@ -2,46 +2,42 @@ use serde_json::Value;
use crate::opencode as schema; use crate::opencode as schema;
use crate::{ use crate::{
ContentPart, ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus,
EventConversion, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, SessionStartedData,
ItemDeltaData, UniversalEventData, UniversalEventType, UniversalItem,
ItemEventData,
ItemKind,
ItemRole,
ItemStatus,
PermissionEventData,
PermissionStatus,
QuestionEventData,
QuestionStatus,
SessionStartedData,
UniversalEventData,
UniversalEventType,
UniversalItem,
}; };
pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>, String> { pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>, String> {
let raw = serde_json::to_value(event).ok(); let raw = serde_json::to_value(event).ok();
match event { match event {
schema::Event::MessageUpdated(updated) => { schema::Event::MessageUpdated(updated) => {
let schema::EventMessageUpdated { properties, type_: _ } = updated; let schema::EventMessageUpdated {
properties,
type_: _,
} = updated;
let schema::EventMessageUpdatedProperties { info } = properties; let schema::EventMessageUpdatedProperties { info } = properties;
let (mut item, completed, session_id) = message_to_item(info); let (mut item, completed, session_id) = message_to_item(info);
item.status = if completed { ItemStatus::Completed } else { ItemStatus::InProgress }; item.status = if completed {
ItemStatus::Completed
} else {
ItemStatus::InProgress
};
let event_type = if completed { let event_type = if completed {
UniversalEventType::ItemCompleted UniversalEventType::ItemCompleted
} else { } else {
UniversalEventType::ItemStarted UniversalEventType::ItemStarted
}; };
let conversion = EventConversion::new( let conversion =
event_type, EventConversion::new(event_type, UniversalEventData::Item(ItemEventData { item }))
UniversalEventData::Item(ItemEventData { item }),
)
.with_native_session(session_id) .with_native_session(session_id)
.with_raw(raw); .with_raw(raw);
Ok(vec![conversion]) Ok(vec![conversion])
} }
schema::Event::MessagePartUpdated(updated) => { schema::Event::MessagePartUpdated(updated) => {
let schema::EventMessagePartUpdated { properties, type_: _ } = updated; let schema::EventMessagePartUpdated {
properties,
type_: _,
} = updated;
let schema::EventMessagePartUpdatedProperties { part, delta } = properties; let schema::EventMessagePartUpdatedProperties { part, delta } = properties;
let mut events = Vec::new(); let mut events = Vec::new();
let (session_id, message_id) = part_session_message(part); let (session_id, message_id) = part_session_message(part);
@ -122,11 +118,16 @@ pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>,
schema::Part::Variant4(tool_part) => { schema::Part::Variant4(tool_part) => {
let tool_events = tool_part_to_events(&tool_part, &message_id); let tool_events = tool_part_to_events(&tool_part, &message_id);
for event in tool_events { for event in tool_events {
events.push(event.with_native_session(session_id.clone()).with_raw(raw.clone())); events.push(
event
.with_native_session(session_id.clone())
.with_raw(raw.clone()),
);
} }
} }
schema::Part::Variant1 { .. } => { schema::Part::Variant1 { .. } => {
let detail = serde_json::to_string(part).unwrap_or_else(|_| "subtask".to_string()); let detail =
serde_json::to_string(part).unwrap_or_else(|_| "subtask".to_string());
let item = status_item("subtask", Some(detail)); let item = status_item("subtask", Some(detail));
events.push( events.push(
EventConversion::new( EventConversion::new(
@ -160,7 +161,10 @@ pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>,
Ok(events) Ok(events)
} }
schema::Event::QuestionAsked(asked) => { schema::Event::QuestionAsked(asked) => {
let schema::EventQuestionAsked { properties, type_: _ } = asked; let schema::EventQuestionAsked {
properties,
type_: _,
} = asked;
let question = question_from_opencode(properties); let question = question_from_opencode(properties);
let conversion = EventConversion::new( let conversion = EventConversion::new(
UniversalEventType::QuestionRequested, UniversalEventType::QuestionRequested,
@ -171,7 +175,10 @@ pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>,
Ok(vec![conversion]) Ok(vec![conversion])
} }
schema::Event::PermissionAsked(asked) => { schema::Event::PermissionAsked(asked) => {
let schema::EventPermissionAsked { properties, type_: _ } = asked; let schema::EventPermissionAsked {
properties,
type_: _,
} = asked;
let permission = permission_from_opencode(properties); let permission = permission_from_opencode(properties);
let conversion = EventConversion::new( let conversion = EventConversion::new(
UniversalEventType::PermissionRequested, UniversalEventType::PermissionRequested,
@ -182,7 +189,10 @@ pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>,
Ok(vec![conversion]) Ok(vec![conversion])
} }
schema::Event::SessionCreated(created) => { schema::Event::SessionCreated(created) => {
let schema::EventSessionCreated { properties, type_: _ } = created; let schema::EventSessionCreated {
properties,
type_: _,
} = created;
let metadata = serde_json::to_value(&properties.info).ok(); let metadata = serde_json::to_value(&properties.info).ok();
let conversion = EventConversion::new( let conversion = EventConversion::new(
UniversalEventType::SessionStarted, UniversalEventType::SessionStarted,
@ -193,8 +203,12 @@ pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>,
Ok(vec![conversion]) Ok(vec![conversion])
} }
schema::Event::SessionStatus(status) => { schema::Event::SessionStatus(status) => {
let schema::EventSessionStatus { properties, type_: _ } = status; let schema::EventSessionStatus {
let detail = serde_json::to_string(&properties.status).unwrap_or_else(|_| "status".to_string()); properties,
type_: _,
} = status;
let detail =
serde_json::to_string(&properties.status).unwrap_or_else(|_| "status".to_string());
let item = status_item("session.status", Some(detail)); let item = status_item("session.status", Some(detail));
let conversion = EventConversion::new( let conversion = EventConversion::new(
UniversalEventType::ItemCompleted, UniversalEventType::ItemCompleted,
@ -205,7 +219,10 @@ pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>,
Ok(vec![conversion]) Ok(vec![conversion])
} }
schema::Event::SessionIdle(idle) => { schema::Event::SessionIdle(idle) => {
let schema::EventSessionIdle { properties, type_: _ } = idle; let schema::EventSessionIdle {
properties,
type_: _,
} = idle;
let item = status_item("session.idle", None); let item = status_item("session.idle", None);
let conversion = EventConversion::new( let conversion = EventConversion::new(
UniversalEventType::ItemCompleted, UniversalEventType::ItemCompleted,
@ -216,8 +233,12 @@ pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>,
Ok(vec![conversion]) Ok(vec![conversion])
} }
schema::Event::SessionError(error) => { schema::Event::SessionError(error) => {
let schema::EventSessionError { properties, type_: _ } = error; let schema::EventSessionError {
let detail = serde_json::to_string(&properties.error).unwrap_or_else(|_| "session error".to_string()); properties,
type_: _,
} = error;
let detail = serde_json::to_string(&properties.error)
.unwrap_or_else(|_| "session error".to_string());
let item = status_item("session.error", Some(detail)); let item = status_item("session.error", Some(detail));
let conversion = EventConversion::new( let conversion = EventConversion::new(
UniversalEventType::ItemCompleted, UniversalEventType::ItemCompleted,
@ -270,7 +291,11 @@ fn message_to_item(message: &schema::Message) -> (UniversalItem, bool, Option<St
kind: ItemKind::Message, kind: ItemKind::Message,
role: Some(ItemRole::Assistant), role: Some(ItemRole::Assistant),
content: Vec::new(), content: Vec::new(),
status: if completed { ItemStatus::Completed } else { ItemStatus::InProgress }, status: if completed {
ItemStatus::Completed
} else {
ItemStatus::InProgress
},
}, },
completed, completed,
Some(session_id.clone()), Some(session_id.clone()),
@ -281,9 +306,10 @@ fn message_to_item(message: &schema::Message) -> (UniversalItem, bool, Option<St
fn part_session_message(part: &schema::Part) -> (Option<String>, String) { fn part_session_message(part: &schema::Part) -> (Option<String>, String) {
match part { match part {
schema::Part::Variant0(text_part) => { schema::Part::Variant0(text_part) => (
(Some(text_part.session_id.clone()), text_part.message_id.clone()) Some(text_part.session_id.clone()),
} text_part.message_id.clone(),
),
schema::Part::Variant1 { schema::Part::Variant1 {
session_id, session_id,
message_id, message_id,

View file

@ -1,13 +1,16 @@
use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use schemars::JsonSchema;
use utoipa::ToSchema; use utoipa::ToSchema;
pub use sandbox_agent_extracted_agent_schemas::{amp, claude, codex, opencode}; pub use sandbox_agent_extracted_agent_schemas::{amp, claude, codex, opencode};
pub mod agents; pub mod agents;
pub use agents::{amp as convert_amp, claude as convert_claude, codex as convert_codex, opencode as convert_opencode}; pub use agents::{
amp as convert_amp, claude as convert_claude, codex as convert_codex,
opencode as convert_opencode,
};
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
pub struct UniversalEvent { pub struct UniversalEvent {
@ -221,14 +224,38 @@ pub enum ItemStatus {
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(tag = "type", rename_all = "snake_case")] #[serde(tag = "type", rename_all = "snake_case")]
pub enum ContentPart { pub enum ContentPart {
Text { text: String }, Text {
Json { json: Value }, text: String,
ToolCall { name: String, arguments: String, call_id: String }, },
ToolResult { call_id: String, output: String }, Json {
FileRef { path: String, action: FileAction, diff: Option<String> }, json: Value,
Reasoning { text: String, visibility: ReasoningVisibility }, },
Image { path: String, mime: Option<String> }, ToolCall {
Status { label: String, detail: Option<String> }, name: String,
arguments: String,
call_id: String,
},
ToolResult {
call_id: String,
output: String,
},
FileRef {
path: String,
action: FileAction,
diff: Option<String>,
},
Reasoning {
text: String,
visibility: ReasoningVisibility,
},
Image {
path: String,
mime: Option<String>,
},
Status {
label: String,
detail: Option<String>,
},
} }
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]