feat: sync universal schema and sdk updates

This commit is contained in:
Nathan Flurry 2026-01-27 02:52:25 -08:00
parent 79bb441287
commit f5d1a6383d
56 changed files with 6800 additions and 3974 deletions

View file

@ -1,5 +1,5 @@
[package]
name = "sandbox-agent-core"
name = "sandbox-agent"
version.workspace = true
edition.workspace = true
authors.workspace = true

View file

@ -10,13 +10,13 @@ use sandbox_agent_agent_management::credentials::{
extract_all_credentials, AuthType, CredentialExtractionOptions, ExtractedCredentials,
ProviderCredentials,
};
use sandbox_agent_core::router::{
use sandbox_agent::router::{
AgentInstallRequest, AppState, AuthConfig, CreateSessionRequest, MessageRequest,
PermissionReply, PermissionReplyRequest, QuestionReplyRequest,
};
use sandbox_agent_core::router::{AgentListResponse, AgentModesResponse, CreateSessionResponse, EventsResponse};
use sandbox_agent_core::router::build_router;
use sandbox_agent_core::ui;
use sandbox_agent::router::{AgentListResponse, AgentModesResponse, CreateSessionResponse, EventsResponse};
use sandbox_agent::router::build_router;
use sandbox_agent::ui;
use serde::Serialize;
use serde_json::Value;
use thiserror::Error;
@ -118,6 +118,9 @@ enum SessionsCommand {
#[command(name = "send-message")]
/// Send a message to an existing session.
SendMessage(SessionMessageArgs),
#[command(name = "terminate")]
/// Terminate a session.
Terminate(SessionTerminateArgs),
#[command(name = "get-messages")]
/// Alias for events; returns session events.
GetMessages(SessionEventsArgs),
@ -195,6 +198,8 @@ struct SessionEventsArgs {
offset: Option<u64>,
#[arg(long, short = 'l')]
limit: Option<u64>,
#[arg(long)]
include_raw: bool,
#[command(flatten)]
client: ClientArgs,
}
@ -204,6 +209,15 @@ struct SessionEventsSseArgs {
session_id: String,
#[arg(long, short = 'o')]
offset: Option<u64>,
#[arg(long)]
include_raw: bool,
#[command(flatten)]
client: ClientArgs,
}
#[derive(Args, Debug)]
struct SessionTerminateArgs {
session_id: String,
#[command(flatten)]
client: ClientArgs,
}
@ -419,16 +433,41 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> {
let response = ctx.post(&path, &body)?;
print_empty_response(response)
}
SessionsCommand::Terminate(args) => {
let ctx = ClientContext::new(cli, &args.client)?;
let path = format!("{API_PREFIX}/sessions/{}/terminate", args.session_id);
let response = ctx.post_empty(&path)?;
print_empty_response(response)
}
SessionsCommand::GetMessages(args) | SessionsCommand::Events(args) => {
let ctx = ClientContext::new(cli, &args.client)?;
let path = format!("{API_PREFIX}/sessions/{}/events", args.session_id);
let response = ctx.get_with_query(&path, &[ ("offset", args.offset), ("limit", args.limit) ])?;
let response = ctx.get_with_query(
&path,
&[
("offset", args.offset.map(|v| v.to_string())),
("limit", args.limit.map(|v| v.to_string())),
(
"include_raw",
if args.include_raw { Some("true".to_string()) } else { None },
),
],
)?;
print_json_response::<EventsResponse>(response)
}
SessionsCommand::EventsSse(args) => {
let ctx = ClientContext::new(cli, &args.client)?;
let path = format!("{API_PREFIX}/sessions/{}/events/sse", args.session_id);
let response = ctx.get_with_query(&path, &[("offset", args.offset)])?;
let response = ctx.get_with_query(
&path,
&[
("offset", args.offset.map(|v| v.to_string())),
(
"include_raw",
if args.include_raw { Some("true".to_string()) } else { None },
),
],
)?;
print_text_response(response)
}
SessionsCommand::ReplyQuestion(args) => {
@ -786,7 +825,7 @@ impl ClientContext {
fn get_with_query(
&self,
path: &str,
query: &[(&str, Option<u64>)],
query: &[(&str, Option<String>)],
) -> Result<reqwest::blocking::Response, CliError> {
let mut request = self.request(Method::GET, path);
for (key, value) in query {

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,657 @@
use std::collections::HashMap;
use std::time::{Duration, Instant};
use axum::body::Body;
use axum::http::{Method, Request, StatusCode};
use axum::Router;
use http_body_util::BodyExt;
use serde_json::{json, Value};
use tempfile::TempDir;
use tower::util::ServiceExt;
use sandbox_agent_agent_management::agents::{AgentId, AgentManager};
use sandbox_agent_agent_management::testing::test_agents_from_env;
use sandbox_agent_agent_credentials::ExtractedCredentials;
use sandbox_agent::router::{
build_router,
AgentCapabilities,
AgentListResponse,
AuthConfig,
};
const PROMPT: &str = "Reply with exactly the single word OK.";
const TOOL_PROMPT: &str =
"Use the bash tool to run `ls` in the current directory. Do not answer without using the tool.";
const QUESTION_PROMPT: &str =
"Call the AskUserQuestion tool with exactly one yes/no question and wait for a reply. Do not answer yourself.";
/// Agent-agnostic event sequence tests.
///
/// These tests assert that the universal schema output is valid and consistent
/// across agents, and they use capability flags from /v1/agents to skip
/// unsupported flows.
struct TestApp {
app: Router,
_install_dir: TempDir,
}
impl TestApp {
fn new() -> Self {
let install_dir = tempfile::tempdir().expect("create temp install dir");
let manager = AgentManager::new(install_dir.path())
.expect("create agent manager");
let state = sandbox_agent::router::AppState::new(AuthConfig::disabled(), manager);
let app = build_router(state);
Self {
app,
_install_dir: install_dir,
}
}
}
struct EnvGuard {
saved: HashMap<String, Option<String>>,
}
impl Drop for EnvGuard {
fn drop(&mut self) {
for (key, value) in &self.saved {
match value {
Some(value) => std::env::set_var(key, value),
None => std::env::remove_var(key),
}
}
}
}
fn apply_credentials(creds: &ExtractedCredentials) -> EnvGuard {
let keys = ["ANTHROPIC_API_KEY", "CLAUDE_API_KEY", "OPENAI_API_KEY", "CODEX_API_KEY"];
let mut saved = HashMap::new();
for key in keys {
saved.insert(key.to_string(), std::env::var(key).ok());
}
match creds.anthropic.as_ref() {
Some(cred) => {
std::env::set_var("ANTHROPIC_API_KEY", &cred.api_key);
std::env::set_var("CLAUDE_API_KEY", &cred.api_key);
}
None => {
std::env::remove_var("ANTHROPIC_API_KEY");
std::env::remove_var("CLAUDE_API_KEY");
}
}
match creds.openai.as_ref() {
Some(cred) => {
std::env::set_var("OPENAI_API_KEY", &cred.api_key);
std::env::set_var("CODEX_API_KEY", &cred.api_key);
}
None => {
std::env::remove_var("OPENAI_API_KEY");
std::env::remove_var("CODEX_API_KEY");
}
}
EnvGuard { saved }
}
async fn send_json(
app: &Router,
method: Method,
path: &str,
body: Option<Value>,
) -> (StatusCode, Value) {
let request = Request::builder()
.method(method)
.uri(path)
.header("content-type", "application/json")
.body(Body::from(body.map(|value| value.to_string()).unwrap_or_default()))
.expect("request");
let response = app
.clone()
.oneshot(request)
.await
.expect("response");
let status = response.status();
let bytes = response
.into_body()
.collect()
.await
.expect("body")
.to_bytes();
let payload = if bytes.is_empty() {
Value::Null
} else {
serde_json::from_slice(&bytes).unwrap_or(Value::Null)
};
(status, payload)
}
async fn send_status(app: &Router, method: Method, path: &str, body: Option<Value>) -> StatusCode {
let (status, _) = send_json(app, method, path, body).await;
status
}
async fn install_agent(app: &Router, agent: AgentId) {
let status = send_status(
app,
Method::POST,
&format!("/v1/agents/{}/install", agent.as_str()),
Some(json!({})),
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT, "install agent {}", agent.as_str());
}
async fn create_session(app: &Router, agent: AgentId, session_id: &str, permission_mode: &str) {
let status = send_status(
app,
Method::POST,
&format!("/v1/sessions/{session_id}"),
Some(json!({
"agent": agent.as_str(),
"permissionMode": permission_mode,
})),
)
.await;
assert_eq!(status, StatusCode::OK, "create session");
}
async fn create_session_with_mode(
app: &Router,
agent: AgentId,
session_id: &str,
agent_mode: &str,
permission_mode: &str,
) {
let status = send_status(
app,
Method::POST,
&format!("/v1/sessions/{session_id}"),
Some(json!({
"agent": agent.as_str(),
"agentMode": agent_mode,
"permissionMode": permission_mode,
})),
)
.await;
assert_eq!(status, StatusCode::OK, "create session");
}
fn test_permission_mode(agent: AgentId) -> &'static str {
match agent {
AgentId::Opencode => "default",
_ => "bypass",
}
}
async fn send_message(app: &Router, session_id: &str, message: &str) {
let status = send_status(
app,
Method::POST,
&format!("/v1/sessions/{session_id}/messages"),
Some(json!({ "message": message })),
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT, "send message");
}
async fn poll_events_until<F>(
app: &Router,
session_id: &str,
timeout: Duration,
mut stop: F,
) -> Vec<Value>
where
F: FnMut(&[Value]) -> bool,
{
let start = Instant::now();
let mut offset = 0u64;
let mut events = Vec::new();
while start.elapsed() < timeout {
let path = format!("/v1/sessions/{session_id}/events?offset={offset}&limit=200");
let (status, payload) = send_json(app, Method::GET, &path, None).await;
assert_eq!(status, StatusCode::OK, "poll events");
let new_events = payload
.get("events")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
if !new_events.is_empty() {
if let Some(last) = new_events
.last()
.and_then(|event| event.get("sequence"))
.and_then(Value::as_u64)
{
offset = last;
}
events.extend(new_events);
if stop(&events) {
break;
}
}
tokio::time::sleep(Duration::from_millis(800)).await;
}
events
}
async fn fetch_capabilities(app: &Router) -> HashMap<String, AgentCapabilities> {
let (status, payload) = send_json(app, Method::GET, "/v1/agents", None).await;
assert_eq!(status, StatusCode::OK, "list agents");
let response: AgentListResponse = serde_json::from_value(payload).expect("agents payload");
response
.agents
.into_iter()
.map(|agent| (agent.id, agent.capabilities))
.collect()
}
fn has_event_type(events: &[Value], event_type: &str) -> bool {
events
.iter()
.any(|event| event.get("type").and_then(Value::as_str) == Some(event_type))
}
fn find_assistant_message_item(events: &[Value]) -> Option<String> {
events.iter().find_map(|event| {
if event.get("type").and_then(Value::as_str) != Some("item.completed") {
return None;
}
let item = event.get("data")?.get("item")?;
let role = item.get("role")?.as_str()?;
let kind = item.get("kind")?.as_str()?;
if role != "assistant" || kind != "message" {
return None;
}
item.get("item_id")?.as_str().map(|id| id.to_string())
})
}
fn event_sequence(event: &Value) -> Option<u64> {
event.get("sequence").and_then(Value::as_u64)
}
fn find_item_event_seq(events: &[Value], event_type: &str, item_id: &str) -> Option<u64> {
events.iter().find_map(|event| {
if event.get("type").and_then(Value::as_str) != Some(event_type) {
return None;
}
match event_type {
"item.delta" => {
let data = event.get("data")?;
let id = data.get("item_id")?.as_str()?;
if id == item_id {
event_sequence(event)
} else {
None
}
}
_ => {
let item = event.get("data")?.get("item")?;
let id = item.get("item_id")?.as_str()?;
if id == item_id {
event_sequence(event)
} else {
None
}
}
}
})
}
fn find_permission_id(events: &[Value]) -> Option<String> {
events.iter().find_map(|event| {
if event.get("type").and_then(Value::as_str) != Some("permission.requested") {
return None;
}
event
.get("data")
.and_then(|data| data.get("permission_id"))
.and_then(Value::as_str)
.map(|id| id.to_string())
})
}
fn find_question_id(events: &[Value]) -> Option<String> {
events.iter().find_map(|event| {
if event.get("type").and_then(Value::as_str) != Some("question.requested") {
return None;
}
event
.get("data")
.and_then(|data| data.get("question_id"))
.and_then(Value::as_str)
.map(|id| id.to_string())
})
}
fn find_first_answer(events: &[Value]) -> Option<Vec<Vec<String>>> {
events.iter().find_map(|event| {
if event.get("type").and_then(Value::as_str) != Some("question.requested") {
return None;
}
let options = event
.get("data")
.and_then(|data| data.get("options"))
.and_then(Value::as_array)?;
let option = options.first()?.as_str()?.to_string();
Some(vec![vec![option]])
})
}
fn find_tool_call(events: &[Value]) -> Option<String> {
events.iter().find_map(|event| {
if event.get("type").and_then(Value::as_str) != Some("item.started")
&& event.get("type").and_then(Value::as_str) != Some("item.completed")
{
return None;
}
let item = event.get("data")?.get("item")?;
let kind = item.get("kind")?.as_str()?;
if kind != "tool_call" {
return None;
}
item.get("item_id")?.as_str().map(|id| id.to_string())
})
}
fn has_tool_result(events: &[Value]) -> bool {
events.iter().any(|event| {
if event.get("type").and_then(Value::as_str) != Some("item.completed") {
return false;
}
let item = match event.get("data").and_then(|data| data.get("item")) {
Some(item) => item,
None => return false,
};
item.get("kind").and_then(Value::as_str) == Some("tool_result")
})
}
fn expect_basic_sequence(events: &[Value]) {
assert!(has_event_type(events, "session.started"), "session.started missing");
let item_id = find_assistant_message_item(events).expect("assistant message missing");
let started_seq = find_item_event_seq(events, "item.started", &item_id)
.expect("item.started missing");
// Intentionally require deltas here to validate our synthetic delta behavior.
let delta_seq = find_item_event_seq(events, "item.delta", &item_id)
.expect("item.delta missing");
let completed_seq = find_item_event_seq(events, "item.completed", &item_id)
.expect("item.completed missing");
assert!(started_seq < delta_seq, "item.started must precede delta");
assert!(delta_seq < completed_seq, "delta must precede completion");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_agnostic_basic_reply() {
let configs = test_agents_from_env().expect("configure SANDBOX_TEST_AGENTS or install agents");
let app = TestApp::new();
let capabilities = fetch_capabilities(&app.app).await;
for config in &configs {
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_id = format!("basic-{}", config.agent.as_str());
create_session(&app.app, config.agent, &session_id, "default").await;
send_message(&app.app, &session_id, PROMPT).await;
let events = poll_events_until(&app.app, &session_id, Duration::from_secs(120), |events| {
has_event_type(events, "error") || find_assistant_message_item(events).is_some()
})
.await;
assert!(
!events.is_empty(),
"no events collected for {}",
config.agent.as_str()
);
expect_basic_sequence(&events);
let caps = capabilities
.get(config.agent.as_str())
.expect("capabilities missing");
if caps.tool_calls {
assert!(
!events.iter().any(|event| {
event.get("type").and_then(Value::as_str) == Some("agent.unparsed")
}),
"agent.unparsed event detected"
);
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_agnostic_tool_flow() {
let configs = test_agents_from_env().expect("configure SANDBOX_TEST_AGENTS or install agents");
let app = TestApp::new();
let capabilities = fetch_capabilities(&app.app).await;
for config in &configs {
let caps = capabilities
.get(config.agent.as_str())
.expect("capabilities missing");
if !caps.tool_calls {
continue;
}
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_id = format!("tool-{}", config.agent.as_str());
create_session(&app.app, config.agent, &session_id, test_permission_mode(config.agent)).await;
send_message(&app.app, &session_id, TOOL_PROMPT).await;
let start = Instant::now();
let mut offset = 0u64;
let mut events = Vec::new();
let mut replied = false;
while start.elapsed() < Duration::from_secs(180) {
let path = format!("/v1/sessions/{session_id}/events?offset={offset}&limit=200");
let (status, payload) = send_json(&app.app, Method::GET, &path, None).await;
assert_eq!(status, StatusCode::OK, "poll events");
let new_events = payload
.get("events")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
if !new_events.is_empty() {
if let Some(last) = new_events
.last()
.and_then(|event| event.get("sequence"))
.and_then(Value::as_u64)
{
offset = last;
}
events.extend(new_events);
if !replied {
if let Some(permission_id) = find_permission_id(&events) {
let _ = send_status(
&app.app,
Method::POST,
&format!(
"/v1/sessions/{session_id}/permissions/{permission_id}/reply"
),
Some(json!({ "reply": "once" })),
)
.await;
replied = true;
}
}
if has_tool_result(&events) {
break;
}
}
tokio::time::sleep(Duration::from_millis(800)).await;
}
let tool_call = find_tool_call(&events);
let tool_result = has_tool_result(&events);
assert!(
tool_call.is_some(),
"tool_call missing for tool-capable agent {}",
config.agent.as_str()
);
if tool_call.is_some() {
assert!(
tool_result,
"tool_result missing after tool_call for {}",
config.agent.as_str()
);
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_agnostic_permission_flow() {
let configs = test_agents_from_env().expect("configure SANDBOX_TEST_AGENTS or install agents");
let app = TestApp::new();
let capabilities = fetch_capabilities(&app.app).await;
for config in &configs {
let caps = capabilities
.get(config.agent.as_str())
.expect("capabilities missing");
if !(caps.plan_mode && caps.permissions) {
continue;
}
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_id = format!("perm-{}", config.agent.as_str());
create_session(&app.app, config.agent, &session_id, "plan").await;
send_message(&app.app, &session_id, TOOL_PROMPT).await;
let events = poll_events_until(&app.app, &session_id, Duration::from_secs(120), |events| {
find_permission_id(events).is_some() || has_event_type(events, "error")
})
.await;
let permission_id = find_permission_id(&events).expect("permission.requested missing");
let status = send_status(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_id}/permissions/{permission_id}/reply"),
Some(json!({ "reply": "once" })),
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT, "permission reply");
let resolved = poll_events_until(&app.app, &session_id, Duration::from_secs(120), |events| {
events.iter().any(|event| {
event.get("type").and_then(Value::as_str) == Some("permission.resolved")
})
})
.await;
assert!(
resolved.iter().any(|event| {
event.get("type").and_then(Value::as_str) == Some("permission.resolved")
&& event
.get("synthetic")
.and_then(Value::as_bool)
.unwrap_or(false)
}),
"permission.resolved should be synthetic"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_agnostic_question_flow() {
let configs = test_agents_from_env().expect("configure SANDBOX_TEST_AGENTS or install agents");
let app = TestApp::new();
let capabilities = fetch_capabilities(&app.app).await;
for config in &configs {
let caps = capabilities
.get(config.agent.as_str())
.expect("capabilities missing");
if !caps.questions {
continue;
}
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_id = format!("question-{}", config.agent.as_str());
create_session_with_mode(&app.app, config.agent, &session_id, "plan", "plan").await;
send_message(&app.app, &session_id, QUESTION_PROMPT).await;
let events = poll_events_until(&app.app, &session_id, Duration::from_secs(120), |events| {
find_question_id(events).is_some() || has_event_type(events, "error")
})
.await;
let question_id = find_question_id(&events).expect("question.requested missing");
let answers = find_first_answer(&events).unwrap_or_else(|| vec![vec![]]);
let status = send_status(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_id}/questions/{question_id}/reply"),
Some(json!({ "answers": answers })),
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT, "question reply");
let resolved = poll_events_until(&app.app, &session_id, Duration::from_secs(120), |events| {
events.iter().any(|event| {
event.get("type").and_then(Value::as_str) == Some("question.resolved")
})
})
.await;
assert!(
resolved.iter().any(|event| {
event.get("type").and_then(Value::as_str) == Some("question.resolved")
&& event
.get("synthetic")
.and_then(Value::as_bool)
.unwrap_or(false)
}),
"question.resolved should be synthetic"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn agent_agnostic_termination() {
let configs = test_agents_from_env().expect("configure SANDBOX_TEST_AGENTS or install agents");
let app = TestApp::new();
for config in &configs {
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_id = format!("terminate-{}", config.agent.as_str());
create_session(&app.app, config.agent, &session_id, "default").await;
let status = send_status(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_id}/terminate"),
None,
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT, "terminate session");
let events = poll_events_until(&app.app, &session_id, Duration::from_secs(30), |events| {
has_event_type(events, "session.ended")
})
.await;
assert!(has_event_type(&events, "session.ended"), "missing session.ended");
let status = send_status(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_id}/messages"),
Some(json!({ "message": PROMPT })),
)
.await;
assert!(!status.is_success(), "terminated session should reject messages");
}
}

View file

@ -12,7 +12,7 @@ use tempfile::TempDir;
use sandbox_agent_agent_management::agents::{AgentId, AgentManager};
use sandbox_agent_agent_management::testing::{test_agents_from_env, TestAgentConfig};
use sandbox_agent_agent_credentials::ExtractedCredentials;
use sandbox_agent_core::router::{build_router, AppState, AuthConfig};
use sandbox_agent::router::{build_router, AppState, AuthConfig};
use tower::util::ServiceExt;
use tower_http::cors::CorsLayer;
@ -226,7 +226,11 @@ async fn poll_events_until(
.cloned()
.unwrap_or_default();
if !new_events.is_empty() {
if let Some(last) = new_events.last().and_then(|event| event.get("id")).and_then(Value::as_u64) {
if let Some(last) = new_events
.last()
.and_then(|event| event.get("sequence"))
.and_then(Value::as_u64)
{
offset = last;
}
events.extend(new_events);
@ -307,26 +311,48 @@ fn should_stop(events: &[Value]) -> bool {
fn is_assistant_message(event: &Value) -> bool {
event
.get("data")
.and_then(|data| data.get("message"))
.and_then(|message| message.get("role"))
.get("type")
.and_then(Value::as_str)
.map(|role| role == "assistant")
.map(|event_type| event_type == "item.completed")
.unwrap_or(false)
&& event
.get("data")
.and_then(|data| data.get("item"))
.and_then(|item| item.get("role"))
.and_then(Value::as_str)
.map(|role| role == "assistant")
.unwrap_or(false)
}
fn is_error_event(event: &Value) -> bool {
matches!(
event.get("type").and_then(Value::as_str),
Some("error") | Some("agent.unparsed")
)
}
fn is_unparsed_event(event: &Value) -> bool {
event
.get("data")
.and_then(|data| data.get("error"))
.is_some()
.get("type")
.and_then(Value::as_str)
.map(|value| value == "agent.unparsed")
.unwrap_or(false)
}
fn is_permission_event(event: &Value) -> bool {
event
.get("data")
.and_then(|data| data.get("permissionAsked"))
.is_some()
.get("type")
.and_then(Value::as_str)
.map(|value| value == "permission.requested")
.unwrap_or(false)
}
fn is_question_event(event: &Value) -> bool {
event
.get("type")
.and_then(Value::as_str)
.map(|value| value == "question.requested")
.unwrap_or(false)
}
fn truncate_permission_events(events: &[Value]) -> Vec<Value> {
@ -339,7 +365,21 @@ fn truncate_permission_events(events: &[Value]) -> Vec<Value> {
events.to_vec()
}
fn truncate_question_events(events: &[Value]) -> Vec<Value> {
if let Some(idx) = events.iter().position(is_question_event) {
return events[..=idx].to_vec();
}
if let Some(idx) = events.iter().position(is_assistant_message) {
return events[..=idx].to_vec();
}
events.to_vec()
}
fn normalize_events(events: &[Value]) -> Value {
assert!(
!events.iter().any(is_unparsed_event),
"agent.unparsed event encountered"
);
let normalized = events
.iter()
.enumerate()
@ -361,77 +401,100 @@ fn truncate_after_first_stop(events: &[Value]) -> Vec<Value> {
fn normalize_event(event: &Value, seq: usize) -> Value {
let mut map = Map::new();
map.insert("seq".to_string(), Value::Number(seq.into()));
if let Some(agent) = event.get("agent").and_then(Value::as_str) {
map.insert("agent".to_string(), Value::String(agent.to_string()));
if let Some(event_type) = event.get("type").and_then(Value::as_str) {
map.insert("type".to_string(), Value::String(event_type.to_string()));
}
if let Some(source) = event.get("source").and_then(Value::as_str) {
map.insert("source".to_string(), Value::String(source.to_string()));
}
if let Some(synthetic) = event.get("synthetic").and_then(Value::as_bool) {
map.insert("synthetic".to_string(), Value::Bool(synthetic));
}
let data = event.get("data").unwrap_or(&Value::Null);
if let Some(message) = data.get("message") {
map.insert("kind".to_string(), Value::String("message".to_string()));
map.insert("message".to_string(), normalize_message(message));
} else if let Some(started) = data.get("started") {
map.insert("kind".to_string(), Value::String("started".to_string()));
map.insert("started".to_string(), normalize_started(started));
} else if let Some(error) = data.get("error") {
map.insert("kind".to_string(), Value::String("error".to_string()));
map.insert("error".to_string(), normalize_error(error));
} else if let Some(question) = data.get("questionAsked") {
map.insert("kind".to_string(), Value::String("question".to_string()));
map.insert("question".to_string(), normalize_question(question));
} else if let Some(permission) = data.get("permissionAsked") {
map.insert("kind".to_string(), Value::String("permission".to_string()));
map.insert("permission".to_string(), normalize_permission(permission));
} else {
map.insert("kind".to_string(), Value::String("unknown".to_string()));
match event.get("type").and_then(Value::as_str).unwrap_or("") {
"session.started" => {
map.insert("session".to_string(), Value::String("started".to_string()));
if data.get("metadata").is_some() {
map.insert("metadata".to_string(), Value::Bool(true));
}
}
"session.ended" => {
map.insert("session".to_string(), Value::String("ended".to_string()));
map.insert("ended".to_string(), normalize_session_end(data));
}
"item.started" | "item.completed" => {
if let Some(item) = data.get("item") {
map.insert("item".to_string(), normalize_item(item));
}
}
"item.delta" => {
let mut delta = Map::new();
if data.get("item_id").is_some() {
delta.insert("item_id".to_string(), Value::String("<redacted>".to_string()));
}
if data.get("native_item_id").is_some() {
delta.insert("native_item_id".to_string(), Value::String("<redacted>".to_string()));
}
if data.get("delta").is_some() {
delta.insert("delta".to_string(), Value::String("<redacted>".to_string()));
}
map.insert("delta".to_string(), Value::Object(delta));
}
"permission.requested" | "permission.resolved" => {
map.insert("permission".to_string(), normalize_permission(data));
}
"question.requested" | "question.resolved" => {
map.insert("question".to_string(), normalize_question(data));
}
"error" => {
map.insert("error".to_string(), normalize_error(data));
}
"agent.unparsed" => {
map.insert("unparsed".to_string(), Value::Bool(true));
}
_ => {}
}
Value::Object(map)
}
fn normalize_message(message: &Value) -> Value {
fn normalize_item(item: &Value) -> Value {
let mut map = Map::new();
if let Some(role) = message.get("role").and_then(Value::as_str) {
if let Some(kind) = item.get("kind").and_then(Value::as_str) {
map.insert("kind".to_string(), Value::String(kind.to_string()));
}
if let Some(role) = item.get("role").and_then(Value::as_str) {
map.insert("role".to_string(), Value::String(role.to_string()));
}
if let Some(parts) = message.get("parts").and_then(Value::as_array) {
let parts = parts.iter().map(normalize_part).collect::<Vec<_>>();
map.insert("parts".to_string(), Value::Array(parts));
} else if message.get("raw").is_some() {
map.insert("unparsed".to_string(), Value::Bool(true));
if let Some(status) = item.get("status").and_then(Value::as_str) {
map.insert("status".to_string(), Value::String(status.to_string()));
}
if let Some(content) = item.get("content").and_then(Value::as_array) {
let types = content
.iter()
.filter_map(|part| part.get("type").and_then(Value::as_str))
.map(|value| Value::String(value.to_string()))
.collect::<Vec<_>>();
map.insert("content_types".to_string(), Value::Array(types));
}
Value::Object(map)
}
fn normalize_part(part: &Value) -> Value {
fn normalize_session_end(data: &Value) -> Value {
let mut map = Map::new();
if let Some(part_type) = part.get("type").and_then(Value::as_str) {
map.insert("type".to_string(), Value::String(part_type.to_string()));
if let Some(reason) = data.get("reason").and_then(Value::as_str) {
map.insert("reason".to_string(), Value::String(reason.to_string()));
}
if let Some(name) = part.get("name").and_then(Value::as_str) {
map.insert("name".to_string(), Value::String(name.to_string()));
}
if part.get("text").is_some() {
map.insert("text".to_string(), Value::String("<redacted>".to_string()));
}
if part.get("input").is_some() {
map.insert("input".to_string(), Value::Bool(true));
}
if part.get("output").is_some() {
map.insert("output".to_string(), Value::Bool(true));
}
Value::Object(map)
}
fn normalize_started(started: &Value) -> Value {
let mut map = Map::new();
if let Some(message) = started.get("message").and_then(Value::as_str) {
map.insert("message".to_string(), Value::String(message.to_string()));
if let Some(terminated_by) = data.get("terminated_by").and_then(Value::as_str) {
map.insert("terminated_by".to_string(), Value::String(terminated_by.to_string()));
}
Value::Object(map)
}
fn normalize_error(error: &Value) -> Value {
let mut map = Map::new();
if let Some(kind) = error.get("kind").and_then(Value::as_str) {
map.insert("kind".to_string(), Value::String(kind.to_string()));
if let Some(code) = error.get("code").and_then(Value::as_str) {
map.insert("code".to_string(), Value::String(code.to_string()));
}
if let Some(message) = error.get("message").and_then(Value::as_str) {
map.insert("message".to_string(), Value::String(message.to_string()));
@ -441,22 +504,28 @@ fn normalize_error(error: &Value) -> Value {
fn normalize_question(question: &Value) -> Value {
let mut map = Map::new();
if question.get("id").is_some() {
if question.get("question_id").is_some() {
map.insert("id".to_string(), Value::String("<redacted>".to_string()));
}
if let Some(questions) = question.get("questions").and_then(Value::as_array) {
map.insert("count".to_string(), Value::Number(questions.len().into()));
if let Some(options) = question.get("options").and_then(Value::as_array) {
map.insert("options".to_string(), Value::Number(options.len().into()));
}
if let Some(status) = question.get("status").and_then(Value::as_str) {
map.insert("status".to_string(), Value::String(status.to_string()));
}
Value::Object(map)
}
fn normalize_permission(permission: &Value) -> Value {
let mut map = Map::new();
if permission.get("id").is_some() {
if permission.get("permission_id").is_some() {
map.insert("id".to_string(), Value::String("<redacted>".to_string()));
}
if let Some(value) = permission.get("permission").and_then(Value::as_str) {
map.insert("permission".to_string(), Value::String(value.to_string()));
if let Some(value) = permission.get("action").and_then(Value::as_str) {
map.insert("action".to_string(), Value::String(value.to_string()));
}
if let Some(status) = permission.get("status").and_then(Value::as_str) {
map.insert("status".to_string(), Value::String(status.to_string()));
}
Value::Object(map)
}
@ -538,8 +607,8 @@ fn normalize_create_session(value: &Value) -> Value {
if let Some(healthy) = value.get("healthy").and_then(Value::as_bool) {
map.insert("healthy".to_string(), Value::Bool(healthy));
}
if value.get("agentSessionId").is_some() {
map.insert("agentSessionId".to_string(), Value::String("<redacted>".to_string()));
if value.get("nativeSessionId").is_some() {
map.insert("nativeSessionId".to_string(), Value::String("<redacted>".to_string()));
}
if let Some(error) = value.get("error") {
map.insert("error".to_string(), error.clone());
@ -611,7 +680,7 @@ where
if !new_events.is_empty() {
if let Some(last) = new_events
.last()
.and_then(|event| event.get("id"))
.and_then(|event| event.get("sequence"))
.and_then(Value::as_u64)
{
offset = last;
@ -631,9 +700,11 @@ fn find_permission_id(events: &[Value]) -> Option<String> {
.iter()
.find_map(|event| {
event
.get("data")
.and_then(|data| data.get("permissionAsked"))
.and_then(|permission| permission.get("id"))
.get("type")
.and_then(Value::as_str)
.filter(|value| *value == "permission.requested")
.and_then(|_| event.get("data"))
.and_then(|data| data.get("permission_id"))
.and_then(Value::as_str)
.map(|id| id.to_string())
})
@ -641,31 +712,23 @@ fn find_permission_id(events: &[Value]) -> Option<String> {
fn find_question_id_and_answers(events: &[Value]) -> Option<(String, Vec<Vec<String>>)> {
let question = events.iter().find_map(|event| {
event
.get("data")
.and_then(|data| data.get("questionAsked"))
.cloned()
let event_type = event.get("type").and_then(Value::as_str)?;
if event_type != "question.requested" {
return None;
}
event.get("data").cloned()
})?;
let id = question.get("id").and_then(Value::as_str)?.to_string();
let questions = question
.get("questions")
let id = question.get("question_id").and_then(Value::as_str)?.to_string();
let options = question
.get("options")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let mut answers = Vec::new();
for question in questions {
let option = question
.get("options")
.and_then(Value::as_array)
.and_then(|options| options.first())
.and_then(|option| option.get("label"))
.and_then(Value::as_str)
.map(|label| label.to_string());
if let Some(label) = option {
answers.push(vec![label]);
} else {
answers.push(Vec::new());
}
if let Some(option) = options.first().and_then(Value::as_str) {
answers.push(vec![option.to_string()]);
} else {
answers.push(Vec::new());
}
Some((id, answers))
}
@ -1039,6 +1102,7 @@ async fn approval_flow_snapshots() {
|events| find_question_id_and_answers(events).is_some() || should_stop(events),
)
.await;
let question_events = truncate_question_events(&question_events);
insta::with_settings!({
snapshot_suffix => snapshot_name("question_reply_events", Some(config.agent)),
}, {
@ -1100,6 +1164,7 @@ async fn approval_flow_snapshots() {
|events| find_question_id_and_answers(events).is_some() || should_stop(events),
)
.await;
let reject_events = truncate_question_events(&reject_events);
insta::with_settings!({
snapshot_suffix => snapshot_name("question_reject_events", Some(config.agent)),
}, {

View file

@ -2,8 +2,8 @@ use axum::body::Body;
use axum::http::{Request, StatusCode};
use http_body_util::BodyExt;
use sandbox_agent_agent_management::agents::AgentManager;
use sandbox_agent_core::router::{build_router, AppState, AuthConfig};
use sandbox_agent_core::ui;
use sandbox_agent::router::{build_router, AppState, AuthConfig};
use sandbox_agent::ui;
use tempfile::TempDir;
use tower::util::ServiceExt;

View file

@ -1,22 +1,45 @@
---
source: server/packages/sandbox-agent/tests/http_sse_snapshots.rs
assertion_line: 1025
expression: normalize_events(&permission_events)
---
- agent: claude
kind: started
- metadata: true
seq: 1
started:
message: session.created
- agent: claude
kind: started
session: started
source: daemon
synthetic: true
type: session.started
- metadata: true
seq: 2
started:
message: system.init
- agent: claude
kind: message
message:
parts:
- text: "<redacted>"
type: text
session: started
source: agent
synthetic: false
type: session.started
- item:
content_types:
- text
kind: message
role: assistant
status: in_progress
seq: 3
source: daemon
synthetic: true
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 4
source: daemon
synthetic: true
type: item.delta
- item:
content_types:
- text
kind: message
role: assistant
status: completed
seq: 5
source: agent
synthetic: false
type: item.completed

View file

@ -1,22 +1,45 @@
---
source: server/packages/sandbox-agent/tests/http_sse_snapshots.rs
assertion_line: 1151
expression: normalize_events(&reject_events)
---
- agent: claude
kind: started
- metadata: true
seq: 1
started:
message: session.created
- agent: claude
kind: started
session: started
source: daemon
synthetic: true
type: session.started
- metadata: true
seq: 2
started:
message: system.init
- agent: claude
kind: message
message:
parts:
- text: "<redacted>"
type: text
session: started
source: agent
synthetic: false
type: session.started
- item:
content_types:
- text
kind: message
role: assistant
status: in_progress
seq: 3
source: daemon
synthetic: true
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 4
source: daemon
synthetic: true
type: item.delta
- item:
content_types:
- text
kind: message
role: assistant
status: completed
seq: 5
source: agent
synthetic: false
type: item.completed

View file

@ -1,31 +1,45 @@
---
source: server/packages/sandbox-agent/tests/http_sse_snapshots.rs
assertion_line: 1045
assertion_line: 1109
expression: normalize_events(&question_events)
---
- agent: claude
kind: started
- metadata: true
seq: 1
started:
message: session.created
- agent: claude
kind: started
session: started
source: daemon
synthetic: true
type: session.started
- metadata: true
seq: 2
started:
message: system.init
- agent: claude
kind: message
message:
parts:
- text: "<redacted>"
type: text
session: started
source: agent
synthetic: false
type: session.started
- item:
content_types:
- text
kind: message
role: assistant
status: in_progress
seq: 3
- agent: claude
kind: message
message:
parts:
- text: "<redacted>"
type: text
role: assistant
source: daemon
synthetic: true
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 4
source: daemon
synthetic: true
type: item.delta
- item:
content_types:
- text
kind: message
role: assistant
status: completed
seq: 5
source: agent
synthetic: false
type: item.completed

View file

@ -1,42 +1,87 @@
---
source: server/packages/sandbox-agent/tests/http_sse_snapshots.rs
assertion_line: 1259
expression: snapshot
---
session_a:
- agent: claude
kind: started
- metadata: true
seq: 1
started:
message: session.created
- agent: claude
kind: started
session: started
source: daemon
synthetic: true
type: session.started
- metadata: true
seq: 2
started:
message: system.init
- agent: claude
kind: message
message:
parts:
- text: "<redacted>"
type: text
session: started
source: agent
synthetic: false
type: session.started
- item:
content_types:
- text
kind: message
role: assistant
status: in_progress
seq: 3
source: daemon
synthetic: true
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 4
source: daemon
synthetic: true
type: item.delta
- item:
content_types:
- text
kind: message
role: assistant
status: completed
seq: 5
source: agent
synthetic: false
type: item.completed
session_b:
- agent: claude
kind: started
- metadata: true
seq: 1
started:
message: session.created
- agent: claude
kind: started
session: started
source: daemon
synthetic: true
type: session.started
- metadata: true
seq: 2
started:
message: system.init
- agent: claude
kind: message
message:
parts:
- text: "<redacted>"
type: text
session: started
source: agent
synthetic: false
type: session.started
- item:
content_types:
- text
kind: message
role: assistant
status: in_progress
seq: 3
source: daemon
synthetic: true
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 4
source: daemon
synthetic: true
type: item.delta
- item:
content_types:
- text
kind: message
role: assistant
status: completed
seq: 5
source: agent
synthetic: false
type: item.completed

View file

@ -1,22 +1,45 @@
---
source: server/packages/sandbox-agent/tests/http_sse_snapshots.rs
assertion_line: 742
expression: normalized
---
- agent: claude
kind: started
- metadata: true
seq: 1
started:
message: session.created
- agent: claude
kind: started
session: started
source: daemon
synthetic: true
type: session.started
- metadata: true
seq: 2
started:
message: system.init
- agent: claude
kind: message
message:
parts:
- text: "<redacted>"
type: text
session: started
source: agent
synthetic: false
type: session.started
- item:
content_types:
- text
kind: message
role: assistant
status: in_progress
seq: 3
source: daemon
synthetic: true
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 4
source: daemon
synthetic: true
type: item.delta
- item:
content_types:
- text
kind: message
role: assistant
status: completed
seq: 5
source: agent
synthetic: false
type: item.completed

View file

@ -1,22 +1,45 @@
---
source: server/packages/sandbox-agent/tests/http_sse_snapshots.rs
assertion_line: 775
expression: normalized
---
- agent: claude
kind: started
- metadata: true
seq: 1
started:
message: session.created
- agent: claude
kind: started
session: started
source: daemon
synthetic: true
type: session.started
- metadata: true
seq: 2
started:
message: system.init
- agent: claude
kind: message
message:
parts:
- text: "<redacted>"
type: text
session: started
source: agent
synthetic: false
type: session.started
- item:
content_types:
- text
kind: message
role: assistant
status: in_progress
seq: 3
source: daemon
synthetic: true
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 4
source: daemon
synthetic: true
type: item.delta
- item:
content_types:
- text
kind: message
role: assistant
status: completed
seq: 5
source: agent
synthetic: false
type: item.completed