fix: add native turn lifecycle and stabilize opencode session flow

This commit is contained in:
Nathan Flurry 2026-02-07 20:24:21 -08:00
parent 2b0507c3f5
commit 91cac052b8
35 changed files with 1688 additions and 486 deletions

View file

@ -605,8 +605,22 @@ fn run_opencode(cli: &CliConfig, args: &OpencodeArgs) -> Result<(), CliError> {
let token = cli.token.clone();
let base_url = format!("http://{}:{}", args.host, args.port);
let has_proxy_env = std::env::var_os("HTTP_PROXY").is_some()
|| std::env::var_os("http_proxy").is_some()
|| std::env::var_os("HTTPS_PROXY").is_some()
|| std::env::var_os("https_proxy").is_some();
let has_no_proxy_env =
std::env::var_os("NO_PROXY").is_some() || std::env::var_os("no_proxy").is_some();
write_stderr_line(&format!(
"gigacode startup: ensuring daemon at {base_url} (token: {}, proxy env: {}, no_proxy env: {})",
if token.is_some() { "set" } else { "unset" },
if has_proxy_env { "set" } else { "unset" },
if has_no_proxy_env { "set" } else { "unset" }
))?;
crate::daemon::ensure_running(cli, &args.host, args.port, token.as_deref())?;
write_stderr_line("gigacode startup: daemon is healthy")?;
write_stderr_line("gigacode startup: creating OpenCode session via /opencode/session")?;
let session_id = create_opencode_session(
&base_url,
token.as_deref(),
@ -616,7 +630,12 @@ fn run_opencode(cli: &CliConfig, args: &OpencodeArgs) -> Result<(), CliError> {
write_stdout_line(&format!("OpenCode session: {session_id}"))?;
let attach_url = format!("{base_url}/opencode");
write_stderr_line("gigacode startup: resolving OpenCode binary (installing if needed)")?;
let opencode_bin = resolve_opencode_bin()?;
write_stderr_line(&format!(
"gigacode startup: launching OpenCode attach using {}",
opencode_bin.display()
))?;
let mut opencode_cmd = ProcessCommand::new(opencode_bin);
opencode_cmd
.arg("attach")

View file

@ -13,6 +13,8 @@ mod build_id {
pub use build_id::BUILD_ID;
const DAEMON_HEALTH_TIMEOUT: Duration = Duration::from_secs(30);
const HEALTH_CHECK_CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
const HEALTH_CHECK_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
// ---------------------------------------------------------------------------
// Paths
@ -143,16 +145,40 @@ pub fn is_process_running(pid: u32) -> bool {
// ---------------------------------------------------------------------------
pub fn check_health(base_url: &str, token: Option<&str>) -> Result<bool, CliError> {
let client = HttpClient::builder().build()?;
let url = format!("{base_url}/v1/health");
let started_at = Instant::now();
let client = HttpClient::builder()
.connect_timeout(HEALTH_CHECK_CONNECT_TIMEOUT)
.timeout(HEALTH_CHECK_REQUEST_TIMEOUT)
.build()?;
let mut request = client.get(url);
if let Some(token) = token {
request = request.bearer_auth(token);
}
match request.send() {
Ok(response) if response.status().is_success() => Ok(true),
Ok(_) => Ok(false),
Err(_) => Ok(false),
Ok(response) if response.status().is_success() => {
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
"daemon health check succeeded"
);
Ok(true)
}
Ok(response) => {
tracing::warn!(
status = %response.status(),
elapsed_ms = started_at.elapsed().as_millis(),
"daemon health check returned non-success status"
);
Ok(false)
}
Err(err) => {
tracing::warn!(
error = %err,
elapsed_ms = started_at.elapsed().as_millis(),
"daemon health check request failed"
);
Ok(false)
}
}
}
@ -162,10 +188,15 @@ pub fn wait_for_health(
token: Option<&str>,
timeout: Duration,
) -> Result<(), CliError> {
let client = HttpClient::builder().build()?;
let client = HttpClient::builder()
.connect_timeout(HEALTH_CHECK_CONNECT_TIMEOUT)
.timeout(HEALTH_CHECK_REQUEST_TIMEOUT)
.build()?;
let deadline = Instant::now() + timeout;
let mut attempts: u32 = 0;
while Instant::now() < deadline {
attempts += 1;
if let Some(child) = server_child.as_mut() {
if let Some(status) = child.try_wait()? {
return Err(CliError::Server(format!(
@ -180,13 +211,43 @@ pub fn wait_for_health(
request = request.bearer_auth(token);
}
match request.send() {
Ok(response) if response.status().is_success() => return Ok(()),
_ => {
Ok(response) if response.status().is_success() => {
tracing::info!(
attempts,
elapsed_ms =
(timeout - deadline.saturating_duration_since(Instant::now())).as_millis(),
"daemon became healthy while waiting"
);
return Ok(());
}
Ok(response) => {
if attempts % 10 == 0 {
tracing::info!(
attempts,
status = %response.status(),
"daemon still not healthy; waiting"
);
}
std::thread::sleep(Duration::from_millis(200));
}
Err(err) => {
if attempts % 10 == 0 {
tracing::warn!(
attempts,
error = %err,
"daemon health poll request failed; still waiting"
);
}
std::thread::sleep(Duration::from_millis(200));
}
}
}
tracing::error!(
attempts,
timeout_ms = timeout.as_millis(),
"timed out waiting for daemon health"
);
Err(CliError::Server(
"timed out waiting for sandbox-agent health".to_string(),
))
@ -197,7 +258,7 @@ pub fn wait_for_health(
// ---------------------------------------------------------------------------
pub fn spawn_sandbox_agent_daemon(
cli: &CliConfig,
_cli: &CliConfig,
host: &str,
port: u16,
token: Option<&str>,
@ -478,6 +539,10 @@ pub fn ensure_running(
) -> Result<(), CliError> {
let base_url = format!("http://{host}:{port}");
let pid_path = daemon_pid_path(host, port);
eprintln!(
"checking daemon health at {base_url} (token: {})...",
if token.is_some() { "set" } else { "unset" }
);
// Check if daemon is already healthy
if check_health(&base_url, token)? {

View file

@ -256,6 +256,7 @@ impl OpenCodeQuestionRecord {
#[derive(Default, Clone)]
struct OpenCodeSessionRuntime {
turn_in_progress: bool,
last_user_message_id: Option<String>,
active_assistant_message_id: Option<String>,
last_agent: Option<String>,
@ -277,6 +278,10 @@ struct OpenCodeSessionRuntime {
open_tool_calls: HashSet<String>,
/// Assistant messages that have streamed text deltas.
messages_with_text_deltas: HashSet<String>,
/// Item IDs (native and normalized) known to be user messages.
user_item_ids: HashSet<String>,
/// Item IDs (native and normalized) that should not emit text deltas.
non_text_item_ids: HashSet<String>,
}
#[derive(Clone, Debug)]
@ -512,29 +517,83 @@ async fn ensure_backing_session(
let request = CreateSessionRequest {
agent: agent.to_string(),
agent_mode: None,
permission_mode,
permission_mode: permission_mode.clone(),
model: model.clone(),
variant: variant.clone(),
agent_version: None,
directory,
title,
};
match state
.inner
.session_manager()
.create_session(session_id.to_string(), request)
let manager = state.inner.session_manager();
match manager
.create_session(session_id.to_string(), request.clone())
.await
{
Ok(_) => Ok(()),
Err(SandboxError::SessionAlreadyExists { .. }) => state
.inner
.session_manager()
.set_session_overrides(session_id, model, variant)
.await
.or_else(|err| match err {
SandboxError::SessionNotFound { .. } => Ok(()),
other => Err(other),
}),
Err(SandboxError::SessionAlreadyExists { .. }) => {
let should_recreate = manager
.get_session_info(session_id)
.await
.map(|info| info.agent != agent && info.event_count <= 1)
.unwrap_or(false);
if should_recreate {
manager.delete_session(session_id).await?;
match manager
.create_session(session_id.to_string(), request.clone())
.await
{
Ok(_) => Ok(()),
Err(SandboxError::SessionAlreadyExists { .. }) => {
match manager
.set_session_overrides(session_id, model.clone(), variant.clone())
.await
{
Ok(()) => Ok(()),
Err(SandboxError::SessionNotFound { .. }) => {
tracing::warn!(
target = "sandbox_agent::opencode",
session_id,
"backing session vanished while applying overrides; retrying create_session"
);
match manager
.create_session(session_id.to_string(), request.clone())
.await
{
Ok(_) | Err(SandboxError::SessionAlreadyExists { .. }) => {
Ok(())
}
Err(err) => Err(err),
}
}
Err(other) => Err(other),
}
}
Err(err) => Err(err),
}
} else {
match manager
.set_session_overrides(session_id, model.clone(), variant.clone())
.await
{
Ok(()) => Ok(()),
Err(SandboxError::SessionNotFound { .. }) => {
tracing::warn!(
target = "sandbox_agent::opencode",
session_id,
"backing session missing while setting overrides; retrying create_session"
);
match manager
.create_session(session_id.to_string(), request.clone())
.await
{
Ok(_) | Err(SandboxError::SessionAlreadyExists { .. }) => Ok(()),
Err(err) => Err(err),
}
}
Err(other) => Err(other),
}
}
}
Err(err) => Err(err),
}
}
@ -596,6 +655,13 @@ struct OpenCodeCreateSessionRequest {
permission: Option<Value>,
#[serde(alias = "permission_mode")]
permission_mode: Option<String>,
#[schema(value_type = String)]
model: Option<Value>,
#[serde(rename = "providerID")]
provider_id: Option<String>,
#[serde(rename = "modelID")]
model_id: Option<String>,
variant: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
@ -687,6 +753,17 @@ struct SessionSummarizeRequest {
auto: Option<bool>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
struct SessionInitRequest {
#[serde(rename = "providerID")]
provider_id: Option<String>,
#[serde(rename = "modelID")]
model_id: Option<String>,
#[serde(rename = "messageID")]
message_id: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
struct PermissionReplyRequest {
response: Option<String>,
@ -1002,13 +1079,16 @@ async fn resolve_session_agent(
) -> (String, String, String) {
let cache = opencode_model_cache(state).await;
let default_model_id = cache.default_model.clone();
let mut provider_id = requested_provider
let requested_provider = requested_provider
.filter(|value| !value.is_empty())
.filter(|value| *value != "sandbox-agent")
.map(|value| value.to_string());
let model_id = requested_model
let requested_model = requested_model
.filter(|value| !value.is_empty())
.map(|value| value.to_string());
let explicit_selection = requested_provider.is_some() || requested_model.is_some();
let mut provider_id = requested_provider.clone();
let model_id = requested_model.clone();
if provider_id.is_none() {
if let Some(model_value) = model_id.as_deref() {
if let Some(entry) = cache
@ -1041,7 +1121,7 @@ async fn resolve_session_agent(
state
.opencode
.update_runtime(session_id, |runtime| {
if runtime.session_agent_id.is_none() {
if runtime.session_agent_id.is_none() || explicit_selection {
let agent = resolved_agent.unwrap_or_else(default_agent_id);
runtime.session_agent_id = Some(agent.as_str().to_string());
runtime.session_provider_id = Some(provider_id.clone());
@ -1527,6 +1607,61 @@ fn unique_assistant_message_id(
}
}
fn set_item_text_delta_capability(
runtime: &mut OpenCodeSessionRuntime,
item_id: Option<&str>,
native_item_id: Option<&str>,
supports_text_deltas: bool,
) {
for key in [item_id, native_item_id].into_iter().flatten() {
if supports_text_deltas {
runtime.non_text_item_ids.remove(key);
} else {
runtime.non_text_item_ids.insert(key.to_string());
}
}
}
fn item_delta_is_non_text(
runtime: &OpenCodeSessionRuntime,
item_id: Option<&str>,
native_item_id: Option<&str>,
) -> bool {
[item_id, native_item_id]
.into_iter()
.flatten()
.any(|key| runtime.non_text_item_ids.contains(key))
}
fn item_supports_text_deltas(item: &UniversalItem) -> bool {
if item.kind != ItemKind::Message {
return false;
}
if !matches!(item.role.as_ref(), Some(ItemRole::Assistant)) {
return false;
}
if item.content.is_empty() {
return true;
}
item.content
.iter()
.any(|part| matches!(part, ContentPart::Text { .. }))
}
fn extract_message_text_from_content(parts: &[ContentPart]) -> Option<String> {
let mut text = String::new();
for part in parts {
if let ContentPart::Text { text: chunk } = part {
text.push_str(chunk);
}
}
if text.is_empty() {
None
} else {
Some(text)
}
}
fn extract_text_from_content(parts: &[ContentPart]) -> Option<String> {
let mut text = String::new();
for part in parts {
@ -1890,43 +2025,77 @@ fn patterns_from_metadata(metadata: &Option<Value>) -> Vec<String> {
patterns
}
fn turn_error_from_metadata(metadata: &Option<Value>) -> Option<(String, Option<Value>)> {
let error = metadata.as_ref()?.get("error")?;
let message = error
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("Turn failed")
.to_string();
Some((message, Some(error.clone())))
}
async fn apply_universal_event(state: Arc<OpenCodeAppState>, event: UniversalEvent) {
match event.event_type {
UniversalEventType::ItemStarted | UniversalEventType::ItemCompleted => {
if let UniversalEventData::Item(ItemEventData { item }) = &event.data {
// turn.completed or session.idle status → emit session.idle
if event.event_type == UniversalEventType::ItemCompleted
&& item.kind == ItemKind::Status
{
if let Some(ContentPart::Status { label, .. }) = item.content.first() {
if label == "turn.completed" || label == "session.idle" {
let runtime = state
.opencode
.update_runtime(&event.session_id, |runtime| {
if runtime.open_tool_calls.is_empty() {
runtime.active_assistant_message_id = None;
}
})
.await;
if !runtime.open_tool_calls.is_empty() {
return;
}
let session_id = event.session_id.clone();
state.opencode.emit_event(json!({
"type": "session.status",
"properties": {"sessionID": session_id, "status": {"type": "idle"}}
}));
state.opencode.emit_event(json!({
"type": "session.idle",
"properties": {"sessionID": session_id}
}));
return;
}
}
}
apply_item_event(state, event.clone(), item.clone()).await;
}
}
UniversalEventType::TurnStarted => {
state
.opencode
.update_runtime(&event.session_id, |runtime| {
runtime.turn_in_progress = true;
})
.await;
let session_id = event.session_id.clone();
state.opencode.emit_event(json!({
"type": "session.status",
"properties": {"sessionID": session_id, "status": {"type": "busy"}}
}));
}
UniversalEventType::TurnEnded => {
let turn_data = match &event.data {
UniversalEventData::Turn(data) => Some(data.clone()),
_ => None,
};
let mut should_emit_idle = false;
let runtime = state
.opencode
.update_runtime(&event.session_id, |runtime| {
let was_turn_in_progress = runtime.turn_in_progress;
if runtime.open_tool_calls.is_empty() {
runtime.active_assistant_message_id = None;
runtime.turn_in_progress = false;
should_emit_idle = was_turn_in_progress;
} else {
runtime.turn_in_progress = true;
should_emit_idle = false;
}
})
.await;
if !runtime.open_tool_calls.is_empty() {
return;
}
if let Some(turn_data) = turn_data {
if let Some((message, details)) = turn_error_from_metadata(&turn_data.metadata) {
emit_session_error(&state.opencode, &event.session_id, &message, None, details);
}
}
if !should_emit_idle {
return;
}
let session_id = event.session_id.clone();
state.opencode.emit_event(json!({
"type": "session.status",
"properties": {"sessionID": session_id, "status": {"type": "idle"}}
}));
state.opencode.emit_event(json!({
"type": "session.idle",
"properties": {"sessionID": session_id}
}));
}
UniversalEventType::ItemDelta => {
if let UniversalEventData::ItemDelta(ItemDeltaData {
item_id,
@ -1945,6 +2114,13 @@ async fn apply_universal_event(state: Arc<OpenCodeAppState>, event: UniversalEve
}
}
UniversalEventType::SessionEnded => {
state
.opencode
.update_runtime(&event.session_id, |runtime| {
runtime.turn_in_progress = false;
runtime.active_assistant_message_id = None;
})
.await;
let session_id = event.session_id.clone();
state.opencode.emit_event(json!({
"type": "session.status",
@ -1968,6 +2144,16 @@ async fn apply_universal_event(state: Arc<OpenCodeAppState>, event: UniversalEve
UniversalEventType::Error => {
if let UniversalEventData::Error(error) = &event.data {
let session_id = event.session_id.clone();
let mut should_emit_idle = false;
state
.opencode
.update_runtime(&session_id, |runtime| {
let was_turn_in_progress = runtime.turn_in_progress;
runtime.turn_in_progress = false;
runtime.active_assistant_message_id = None;
should_emit_idle = was_turn_in_progress;
})
.await;
emit_session_error(
&state.opencode,
&session_id,
@ -1975,7 +2161,9 @@ async fn apply_universal_event(state: Arc<OpenCodeAppState>, event: UniversalEve
error.code.as_deref(),
error.details.clone(),
);
emit_session_idle(&state.opencode, &session_id);
if should_emit_idle {
emit_session_idle(&state.opencode, &session_id);
}
}
}
_ => {}
@ -2111,16 +2299,6 @@ async fn apply_item_event(
event: UniversalEvent,
item: UniversalItem,
) {
if matches!(item.kind, ItemKind::ToolCall | ItemKind::ToolResult) {
apply_tool_item_event(state, event, item).await;
return;
}
if item.kind != ItemKind::Message {
return;
}
if matches!(item.role, Some(ItemRole::User)) {
return;
}
let session_id = event.session_id.clone();
let item_id_key = if item.item_id.is_empty() {
None
@ -2128,6 +2306,38 @@ async fn apply_item_event(
Some(item.item_id.clone())
};
let native_id_key = item.native_item_id.clone();
let supports_text_deltas = item_supports_text_deltas(&item);
let is_user_item = matches!(item.role.as_ref(), Some(ItemRole::User));
let _ = state
.opencode
.update_runtime(&session_id, |runtime| {
set_item_text_delta_capability(
runtime,
item_id_key.as_deref(),
native_id_key.as_deref(),
supports_text_deltas,
);
if is_user_item {
if let Some(item_key) = item_id_key.as_ref() {
runtime.user_item_ids.insert(item_key.clone());
}
if let Some(native_key) = native_id_key.as_ref() {
runtime.user_item_ids.insert(native_key.clone());
}
}
})
.await;
if matches!(item.kind, ItemKind::ToolCall | ItemKind::ToolResult) {
apply_tool_item_event(state, event, item).await;
return;
}
if item.kind != ItemKind::Message {
return;
}
if is_user_item {
return;
}
let mut message_id: Option<String> = None;
let mut parent_id: Option<String> = None;
let runtime = state
@ -2146,6 +2356,7 @@ async fn apply_item_event(
.clone()
.and_then(|key| runtime.message_id_for_item.get(&key).cloned())
})
.or_else(|| runtime.active_assistant_message_id.clone())
{
message_id = Some(existing);
} else {
@ -2216,7 +2427,7 @@ async fn apply_item_event(
})
.await;
if let Some(text) = extract_text_from_content(&item.content) {
if let Some(text) = extract_message_text_from_content(&item.content) {
if event.event_type == UniversalEventType::ItemStarted {
// Reset streaming text state for a new assistant item.
let _ = state
@ -2677,22 +2888,35 @@ async fn apply_item_delta(
Some(item_id)
};
let native_id_key = native_item_id;
let is_user_delta = item_id_key
.as_ref()
.map(|value| value.starts_with("user_"))
.unwrap_or(false)
|| native_id_key
.as_ref()
.map(|value| value.starts_with("user_"))
.unwrap_or(false);
if is_user_delta {
return;
}
let mut message_id: Option<String> = None;
let mut parent_id: Option<String> = None;
let mut is_user_delta = false;
let mut suppress_non_text_delta = false;
let runtime = state
.opencode
.update_runtime(&session_id, |runtime| {
if item_delta_is_non_text(runtime, item_id_key.as_deref(), native_id_key.as_deref()) {
suppress_non_text_delta = true;
return;
}
let is_user_from_runtime = item_id_key
.as_ref()
.is_some_and(|value| runtime.user_item_ids.contains(value))
|| native_id_key
.as_ref()
.is_some_and(|value| runtime.user_item_ids.contains(value));
let is_user_from_prefix = item_id_key
.as_ref()
.map(|value| value.starts_with("user_"))
.unwrap_or(false)
|| native_id_key
.as_ref()
.map(|value| value.starts_with("user_"))
.unwrap_or(false);
if is_user_from_runtime || is_user_from_prefix {
is_user_delta = true;
return;
}
parent_id = runtime.last_user_message_id.clone();
if let Some(existing) = item_id_key
.clone()
@ -2720,6 +2944,9 @@ async fn apply_item_delta(
}
})
.await;
if is_user_delta || suppress_non_text_delta {
return;
}
let message_id = message_id.unwrap_or_else(|| {
unique_assistant_message_id(&runtime, parent_id.as_ref(), event.sequence)
});
@ -3494,6 +3721,10 @@ async fn oc_session_create(
parent_id: None,
permission: None,
permission_mode: None,
model: None,
provider_id: None,
model_id: None,
variant: None,
});
let directory = state
.opencode
@ -3502,7 +3733,19 @@ async fn oc_session_create(
let id = next_id("ses_", &SESSION_COUNTER);
let slug = format!("session-{}", id);
let title = body.title.unwrap_or_else(|| format!("Session {}", id));
let permission_mode = body.permission_mode;
let permission_mode = body.permission_mode.clone();
let requested_provider = body
.model
.as_ref()
.and_then(|v| v.get("providerID"))
.and_then(|v| v.as_str())
.or(body.provider_id.as_deref());
let requested_model = body
.model
.as_ref()
.and_then(|v| v.get("modelID"))
.and_then(|v| v.as_str())
.or(body.model_id.as_deref());
let record = OpenCodeSessionRecord {
id: id.clone(),
slug,
@ -3514,7 +3757,7 @@ async fn oc_session_create(
created_at: now,
updated_at: now,
share_url: None,
permission_mode,
permission_mode: permission_mode.clone(),
};
let session_value = record.to_value();
@ -3523,11 +3766,32 @@ async fn oc_session_create(
sessions.insert(id.clone(), record);
drop(sessions);
let (session_agent, provider_id, model_id) =
resolve_session_agent(&state, &id, requested_provider, requested_model).await;
let session_agent_id = AgentId::parse(&session_agent).unwrap_or_else(default_agent_id);
let backing_model = backing_model_for_agent(session_agent_id, &provider_id, &model_id);
let backing_variant = body.variant.clone();
if let Err(err) = ensure_backing_session(
&state,
&id,
&session_agent,
backing_model,
backing_variant,
permission_mode,
)
.await
{
let mut sessions = state.opencode.sessions.lock().await;
sessions.remove(&id);
drop(sessions);
return sandbox_error_response(err).into_response();
}
state
.opencode
.emit_event(session_event("session.created", &session_value));
(StatusCode::OK, Json(session_value))
(StatusCode::OK, Json(session_value)).into_response()
}
#[utoipa::path(
@ -3591,6 +3855,14 @@ async fn oc_session_update(
let mut sessions = state.opencode.sessions.lock().await;
if let Some(session) = sessions.get_mut(&session_id) {
if let Some(title) = body.title {
if let Err(err) = state
.inner
.session_manager()
.set_session_title(&session_id, title.clone())
.await
{
return sandbox_error_response(err).into_response();
}
session.title = title;
session.updated_at = state.opencode.now_ms();
}
@ -3616,6 +3888,15 @@ async fn oc_session_delete(
) -> impl IntoResponse {
let mut sessions = state.opencode.sessions.lock().await;
if let Some(session) = sessions.remove(&session_id) {
drop(sessions);
if let Err(err) = state
.inner
.session_manager()
.delete_session(&session_id)
.await
{
return sandbox_error_response(err).into_response();
}
state
.opencode
.emit_event(session_event("session.deleted", &session.to_value()));
@ -3632,9 +3913,18 @@ async fn oc_session_delete(
)]
async fn oc_session_status(State(state): State<Arc<OpenCodeAppState>>) -> impl IntoResponse {
let sessions = state.inner.session_manager().list_sessions().await;
let runtimes = state.opencode.session_runtime.lock().await;
let mut status_map = serde_json::Map::new();
for s in &sessions {
let status = if s.ended { "idle" } else { "busy" };
let status = if runtimes
.get(&s.session_id)
.map(|runtime| runtime.turn_in_progress)
.unwrap_or(false)
{
"busy"
} else {
"idle"
};
status_map.insert(s.session_id.clone(), json!({"type": status}));
}
(StatusCode::OK, Json(Value::Object(status_map)))
@ -3669,11 +3959,61 @@ async fn oc_session_children() -> impl IntoResponse {
post,
path = "/session/{sessionID}/init",
params(("sessionID" = String, Path, description = "Session ID")),
request_body = SessionInitRequest,
responses((status = 200)),
tag = "opencode"
)]
async fn oc_session_init() -> impl IntoResponse {
bool_ok(true)
async fn oc_session_init(
State(state): State<Arc<OpenCodeAppState>>,
Path(session_id): Path<String>,
headers: HeaderMap,
Query(query): Query<DirectoryQuery>,
body: Option<Json<SessionInitRequest>>,
) -> impl IntoResponse {
let directory = state
.opencode
.directory_for(&headers, query.directory.as_ref());
let _ = state.opencode.ensure_session(&session_id, directory).await;
let body = body.map(|json| json.0).unwrap_or(SessionInitRequest {
provider_id: None,
model_id: None,
message_id: None,
});
let requested_provider = body
.provider_id
.as_deref()
.filter(|value| !value.is_empty());
let requested_model = body.model_id.as_deref().filter(|value| !value.is_empty());
if requested_provider.is_none() && requested_model.is_none() {
return bool_ok(true).into_response();
}
if requested_provider.is_none() || requested_model.is_none() {
return bad_request("providerID and modelID are required when selecting a model")
.into_response();
}
let (session_agent, provider_id, model_id) =
resolve_session_agent(&state, &session_id, requested_provider, requested_model).await;
let session_agent_id = AgentId::parse(&session_agent).unwrap_or_else(default_agent_id);
let backing_model = backing_model_for_agent(session_agent_id, &provider_id, &model_id);
let session_permission_mode = {
let sessions = state.opencode.sessions.lock().await;
sessions
.get(&session_id)
.and_then(|s| s.permission_mode.clone())
};
if let Err(err) = ensure_backing_session(
&state,
&session_id,
&session_agent,
backing_model,
None,
session_permission_mode,
)
.await
{
return sandbox_error_response(err).into_response();
}
bool_ok(true).into_response()
}
#[utoipa::path(
@ -3877,6 +4217,7 @@ async fn oc_session_message_create(
let _ = state
.opencode
.update_runtime(&session_id, |runtime| {
runtime.turn_in_progress = true;
runtime.last_user_message_id = Some(user_message_id.clone());
runtime.active_assistant_message_id = None;
runtime.last_agent = Some(agent_mode.clone());
@ -3902,6 +4243,13 @@ async fn oc_session_message_create(
)
.await
{
let _ = state
.opencode
.update_runtime(&session_id, |runtime| {
runtime.turn_in_progress = false;
runtime.active_assistant_message_id = None;
})
.await;
tracing::warn!(
target = "sandbox_agent::opencode",
?err,
@ -3926,6 +4274,13 @@ async fn oc_session_message_create(
.send_message(session_id.clone(), prompt_text)
.await
{
let _ = state
.opencode
.update_runtime(&session_id, |runtime| {
runtime.turn_in_progress = false;
runtime.active_assistant_message_id = None;
})
.await;
tracing::warn!(
target = "sandbox_agent::opencode",
?err,
@ -5421,3 +5776,107 @@ async fn oc_tui_select_session(
tags((name = "opencode", description = "OpenCode compatibility API"))
)]
pub struct OpenCodeApiDoc;
#[cfg(test)]
mod tests {
use super::*;
use sandbox_agent_universal_agent_schema::ReasoningVisibility;
fn assistant_item(content: Vec<ContentPart>) -> UniversalItem {
UniversalItem {
item_id: "itm_assistant".to_string(),
native_item_id: Some("native_assistant".to_string()),
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content,
status: ItemStatus::InProgress,
}
}
#[test]
fn extract_message_text_ignores_non_text_parts() {
let parts = vec![
ContentPart::Status {
label: "Thinking".to_string(),
detail: Some("Preparing friendly brief response".to_string()),
},
ContentPart::Reasoning {
text: "Preparing friendly brief response".to_string(),
visibility: ReasoningVisibility::Public,
},
ContentPart::Text {
text: "Hey! How can I help?".to_string(),
},
ContentPart::Json {
json: serde_json::json!({"ignored": true}),
},
];
assert_eq!(
extract_message_text_from_content(&parts),
Some("Hey! How can I help?".to_string())
);
}
#[test]
fn item_supports_text_deltas_only_for_assistant_text_messages() {
assert!(item_supports_text_deltas(&assistant_item(Vec::new())));
assert!(item_supports_text_deltas(&assistant_item(vec![
ContentPart::Text {
text: "hello".to_string(),
}
])));
assert!(!item_supports_text_deltas(&assistant_item(vec![
ContentPart::Reasoning {
text: "internal".to_string(),
visibility: ReasoningVisibility::Private,
}
])));
let user = UniversalItem {
item_id: "itm_user".to_string(),
native_item_id: Some("native_user".to_string()),
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::User),
content: vec![ContentPart::Text {
text: "hello".to_string(),
}],
status: ItemStatus::InProgress,
};
assert!(!item_supports_text_deltas(&user));
let status = UniversalItem {
item_id: "itm_status".to_string(),
native_item_id: Some("native_status".to_string()),
parent_id: None,
kind: ItemKind::Status,
role: Some(ItemRole::Assistant),
content: vec![ContentPart::Status {
label: "thinking".to_string(),
detail: None,
}],
status: ItemStatus::InProgress,
};
assert!(!item_supports_text_deltas(&status));
}
#[test]
fn text_delta_capability_blocks_non_text_item_ids() {
let mut runtime = OpenCodeSessionRuntime::default();
set_item_text_delta_capability(&mut runtime, Some("itm_1"), Some("native_1"), false);
assert!(item_delta_is_non_text(
&runtime,
Some("itm_1"),
Some("native_1")
));
set_item_text_delta_capability(&mut runtime, Some("itm_1"), Some("native_1"), true);
assert!(!item_delta_is_non_text(
&runtime,
Some("itm_1"),
Some("native_1")
));
}
}

View file

@ -22,11 +22,12 @@ use reqwest::Client;
use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError};
use sandbox_agent_universal_agent_schema::{
codex as codex_schema, convert_amp, convert_claude, convert_codex, convert_opencode,
turn_completed_event, AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource,
FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData,
PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason,
SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent,
UniversalEventData, UniversalEventType, UniversalItem,
turn_ended_event, turn_started_event, AgentUnparsedData, ContentPart, ErrorData,
EventConversion, EventSource, FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole,
ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus,
ReasoningVisibility, SessionEndReason, SessionEndedData, SessionStartedData, StderrOutput,
TerminatedBy, TurnEventData, TurnPhase, UniversalEvent, UniversalEventData, UniversalEventType,
UniversalItem,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
@ -336,6 +337,8 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
EventSource,
SessionStartedData,
SessionEndedData,
TurnEventData,
TurnPhase,
SessionEndReason,
TerminatedBy,
StderrOutput,
@ -648,6 +651,7 @@ impl SessionState {
}
if conversion.event_type == UniversalEventType::ItemCompleted
&& data.item.kind == ItemKind::Message
&& !matches!(data.item.role, Some(ItemRole::User))
&& !self.item_delta_seen.contains(&data.item.item_id)
{
if let Some(delta) = text_delta_from_parts(&data.item.content) {
@ -736,6 +740,15 @@ impl SessionState {
}
}
}
if event.event_type == UniversalEventType::PermissionRequested
&& self.permission_mode == "acceptEdits"
{
if let UniversalEventData::Permission(ref data) = event.data {
if is_file_change_action(&data.action) {
return None;
}
}
}
self.events.push(event.clone());
let _ = self.broadcaster.send(event.clone());
@ -1853,6 +1866,49 @@ impl SessionManager {
Ok(())
}
pub(crate) async fn set_session_title(
&self,
session_id: &str,
title: String,
) -> Result<(), SandboxError> {
let mut sessions = self.sessions.lock().await;
let Some(session) = SessionManager::session_mut(&mut sessions, session_id) else {
return Err(SandboxError::SessionNotFound {
session_id: session_id.to_string(),
});
};
session.title = Some(title);
session.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(session.updated_at);
Ok(())
}
pub(crate) async fn delete_session(&self, session_id: &str) -> Result<(), SandboxError> {
let (agent, native_session_id) = {
let mut sessions = self.sessions.lock().await;
let Some(index) = sessions
.iter()
.position(|session| session.session_id == session_id)
else {
return Err(SandboxError::SessionNotFound {
session_id: session_id.to_string(),
});
};
let session = sessions.remove(index);
(session.agent, session.native_session_id)
};
if agent == AgentId::Opencode || agent == AgentId::Codex {
self.server_manager
.unregister_session(agent, session_id, native_session_id.as_deref())
.await;
}
Ok(())
}
async fn agent_modes(&self, agent: AgentId) -> Result<Vec<AgentModeInfo>, SandboxError> {
if agent != AgentId::Opencode {
return Ok(agent_modes_for(agent));
@ -1946,6 +2002,14 @@ impl SessionManager {
) -> Result<(), SandboxError> {
// Use allow_ended=true and do explicit check to allow resumable agents
let session_snapshot = self.session_snapshot_for_message(&session_id).await?;
if !agent_emits_turn_started(session_snapshot.agent) {
let _ = self
.record_conversions(
&session_id,
vec![turn_started_event(None, None).synthetic()],
)
.await;
}
if session_snapshot.agent == AgentId::Mock {
self.send_mock_message(session_id, message).await?;
return Ok(());
@ -2568,46 +2632,7 @@ impl SessionManager {
.ok_or_else(|| SandboxError::InvalidRequest {
message: "missing codex permission metadata".to_string(),
})?;
let metadata = pending.metadata.clone().unwrap_or(Value::Null);
let request_id = codex_request_id_from_metadata(&metadata)
.or_else(|| codex_request_id_from_string(permission_id))
.ok_or_else(|| SandboxError::InvalidRequest {
message: "invalid codex permission request id".to_string(),
})?;
let request_kind = metadata
.get("codexRequestKind")
.and_then(Value::as_str)
.unwrap_or("");
let response_value = match request_kind {
"commandExecution" => {
let decision = codex_command_decision_for_reply(reply.clone());
let response =
codex_schema::CommandExecutionRequestApprovalResponse { decision };
serde_json::to_value(response).map_err(|err| SandboxError::InvalidRequest {
message: err.to_string(),
})?
}
"fileChange" => {
let decision = codex_file_change_decision_for_reply(reply.clone());
let response = codex_schema::FileChangeRequestApprovalResponse { decision };
serde_json::to_value(response).map_err(|err| SandboxError::InvalidRequest {
message: err.to_string(),
})?
}
_ => {
return Err(SandboxError::InvalidRequest {
message: "unsupported codex permission request".to_string(),
});
}
};
let response = codex_schema::JsonrpcResponse {
id: request_id,
result: response_value,
};
let line =
serde_json::to_string(&response).map_err(|err| SandboxError::InvalidRequest {
message: err.to_string(),
})?;
let line = codex_permission_response_line(permission_id, &pending, reply.clone())?;
server
.stdin_sender
.send(line)
@ -2977,8 +3002,23 @@ impl SessionManager {
session_id: session_id.to_string(),
}
})?;
let mut accept_edits_permission_ids = Vec::new();
if session.agent == AgentId::Codex && session.permission_mode == "acceptEdits" {
for conversion in &conversions {
if conversion.event_type != UniversalEventType::PermissionRequested {
continue;
}
let UniversalEventData::Permission(data) = &conversion.data else {
continue;
};
if is_file_change_action(&data.action) {
accept_edits_permission_ids.push(data.permission_id.clone());
}
}
}
let events = session.record_conversions(conversions);
let mut auto_approvals = Vec::new();
let mut seen = HashSet::new();
for event in &events {
if event.event_type != UniversalEventType::PermissionRequested {
continue;
@ -2987,10 +3027,7 @@ impl SessionManager {
continue;
};
let cached = session.should_auto_approve_permission(&data.action, &data.metadata);
if session.agent == AgentId::Codex
|| is_question_tool_action(&data.action)
|| !cached
{
if is_question_tool_action(&data.action) || !cached {
continue;
}
if let Some(pending) = session.take_permission(&data.permission_id) {
@ -3000,14 +3037,49 @@ impl SessionManager {
session.claude_sender(),
data.permission_id.clone(),
pending,
PermissionReply::Always,
));
seen.insert(data.permission_id.clone());
}
}
for permission_id in accept_edits_permission_ids {
if seen.contains(&permission_id) {
continue;
}
if let Some(pending) = session.take_permission(&permission_id) {
auto_approvals.push((
session.agent,
session.native_session_id.clone(),
session.claude_sender(),
permission_id.clone(),
pending,
PermissionReply::Always,
));
seen.insert(permission_id);
}
}
(events, auto_approvals)
};
for (agent, native_session_id, claude_sender, permission_id, pending) in auto_approvals {
for (agent, native_session_id, claude_sender, permission_id, pending, reply) in
auto_approvals
{
let reply_for_status = reply.clone();
let reply_result = match agent {
AgentId::Codex => {
let (server, _) = self
.server_manager
.ensure_stdio_server(AgentId::Codex)
.await?;
let line =
codex_permission_response_line(&permission_id, &pending, reply.clone())?;
server
.stdin_sender
.send(line)
.map_err(|_| SandboxError::InvalidRequest {
message: "codex server not active".to_string(),
})
}
AgentId::Opencode => {
let agent_session_id =
native_session_id
@ -3020,7 +3092,7 @@ impl SessionManager {
self.opencode_permission_reply(
&agent_session_id,
&permission_id,
PermissionReply::Always,
reply.clone(),
)
.await
}
@ -3039,12 +3111,27 @@ impl SessionManager {
.cloned()
.unwrap_or(Value::Null);
let mut response_map = serde_json::Map::new();
if !updated_input.is_null() {
response_map.insert("updatedInput".to_string(), updated_input);
match reply.clone() {
PermissionReply::Reject => {
response_map.insert(
"message".to_string(),
Value::String("Permission denied.".to_string()),
);
}
PermissionReply::Once | PermissionReply::Always => {
if !updated_input.is_null() {
response_map
.insert("updatedInput".to_string(), updated_input);
}
}
}
let behavior = match reply.clone() {
PermissionReply::Reject => "deny",
PermissionReply::Once | PermissionReply::Always => "allow",
};
let line = claude_control_response_line(
&permission_id,
"allow",
behavior,
Value::Object(response_map),
);
sender.send(line).map_err(|_| SandboxError::InvalidRequest {
@ -3078,7 +3165,11 @@ impl SessionManager {
UniversalEventData::Permission(PermissionEventData {
permission_id: permission_id.clone(),
action: pending.action,
status: PermissionStatus::AcceptForSession,
status: match reply_for_status {
PermissionReply::Reject => PermissionStatus::Reject,
PermissionReply::Once => PermissionStatus::Accept,
PermissionReply::Always => PermissionStatus::AcceptForSession,
},
metadata: pending.metadata,
}),
)
@ -5007,6 +5098,10 @@ fn agent_supports_item_started(agent: AgentId) -> bool {
agent_capabilities_for(agent).item_started
}
fn agent_emits_turn_started(agent: AgentId) -> bool {
matches!(agent, AgentId::Codex | AgentId::Opencode)
}
fn agent_capabilities_for(agent: AgentId) -> AgentCapabilities {
match agent {
// Claude CLI supports tool calls/results and permission prompts via the SDK control protocol,
@ -5375,7 +5470,7 @@ fn normalize_permission_mode(
agent: AgentId,
permission_mode: Option<&str>,
) -> Result<String, SandboxError> {
let mode = match permission_mode.unwrap_or("default") {
let mut mode = match permission_mode.unwrap_or("default") {
"default" | "plan" | "bypass" | "acceptEdits" => permission_mode.unwrap_or("default"),
value => {
return Err(SandboxError::InvalidRequest {
@ -5384,6 +5479,10 @@ fn normalize_permission_mode(
.into())
}
};
if agent != AgentId::Claude && mode == "acceptEdits" && agent != AgentId::Codex {
// acceptEdits is Claude-only unless explicitly handled; treat it as a no-op for other agents.
mode = "default";
}
if agent == AgentId::Claude {
// Claude refuses --dangerously-skip-permissions when running as root,
// which is common in container environments (Docker, Daytona, E2B).
@ -5402,7 +5501,7 @@ fn normalize_permission_mode(
}
let supported = match agent {
AgentId::Claude => false,
AgentId::Codex => matches!(mode, "default" | "plan" | "bypass"),
AgentId::Codex => matches!(mode, "default" | "plan" | "bypass" | "acceptEdits"),
AgentId::Amp => matches!(mode, "default" | "bypass"),
AgentId::Opencode => matches!(mode, "default"),
AgentId::Mock => matches!(mode, "default" | "plan" | "bypass"),
@ -5482,14 +5581,30 @@ fn build_spawn_options(
}
});
if let Some(anthropic) = credentials.anthropic {
options
.env
.entry("ANTHROPIC_API_KEY".to_string())
.or_insert(anthropic.api_key.clone());
options
.env
.entry("CLAUDE_API_KEY".to_string())
.or_insert(anthropic.api_key);
let should_inject_claude_env = !(session.agent == AgentId::Claude
&& anthropic.source == "claude-code"
&& anthropic.provider == "anthropic");
if should_inject_claude_env {
if session.agent == AgentId::Claude && anthropic.auth_type == AuthType::Oauth {
options
.env
.entry("CLAUDE_CODE_OAUTH_TOKEN".to_string())
.or_insert(anthropic.api_key.clone());
options
.env
.entry("ANTHROPIC_AUTH_TOKEN".to_string())
.or_insert(anthropic.api_key);
} else {
options
.env
.entry("ANTHROPIC_API_KEY".to_string())
.or_insert(anthropic.api_key.clone());
options
.env
.entry("CLAUDE_API_KEY".to_string())
.or_insert(anthropic.api_key);
}
}
}
if let Some(openai) = credentials.openai {
options
@ -5504,6 +5619,102 @@ fn build_spawn_options(
options
}
#[cfg(test)]
mod tests {
use super::*;
fn test_snapshot(agent: AgentId) -> SessionSnapshot {
SessionSnapshot {
session_id: "test-session".to_string(),
agent,
agent_mode: "build".to_string(),
permission_mode: "default".to_string(),
model: None,
variant: None,
native_session_id: None,
}
}
fn claude_code_api_key_credentials() -> ExtractedCredentials {
ExtractedCredentials {
anthropic: Some(ProviderCredentials {
api_key: "sk-ant-test".to_string(),
source: "claude-code".to_string(),
auth_type: AuthType::ApiKey,
provider: "anthropic".to_string(),
}),
openai: None,
other: HashMap::new(),
}
}
fn environment_oauth_credentials() -> ExtractedCredentials {
ExtractedCredentials {
anthropic: Some(ProviderCredentials {
api_key: "oauth-token".to_string(),
source: "environment".to_string(),
auth_type: AuthType::Oauth,
provider: "anthropic".to_string(),
}),
openai: None,
other: HashMap::new(),
}
}
#[test]
fn build_spawn_options_skips_claude_env_for_claude_code_source() {
let options = build_spawn_options(
&test_snapshot(AgentId::Claude),
"hello".to_string(),
claude_code_api_key_credentials(),
);
assert!(!options.env.contains_key("ANTHROPIC_API_KEY"));
assert!(!options.env.contains_key("CLAUDE_API_KEY"));
}
#[test]
fn build_spawn_options_keeps_anthropic_env_for_non_claude_agent() {
let options = build_spawn_options(
&test_snapshot(AgentId::Amp),
"hello".to_string(),
claude_code_api_key_credentials(),
);
assert_eq!(
options.env.get("ANTHROPIC_API_KEY").map(String::as_str),
Some("sk-ant-test")
);
assert_eq!(
options.env.get("CLAUDE_API_KEY").map(String::as_str),
Some("sk-ant-test")
);
}
#[test]
fn build_spawn_options_uses_oauth_env_for_claude_oauth_credentials() {
let options = build_spawn_options(
&test_snapshot(AgentId::Claude),
"hello".to_string(),
environment_oauth_credentials(),
);
assert_eq!(
options
.env
.get("CLAUDE_CODE_OAUTH_TOKEN")
.map(String::as_str),
Some("oauth-token")
);
assert_eq!(
options.env.get("ANTHROPIC_AUTH_TOKEN").map(String::as_str),
Some("oauth-token")
);
assert!(!options.env.contains_key("ANTHROPIC_API_KEY"));
assert!(!options.env.contains_key("CLAUDE_API_KEY"));
}
}
fn claude_input_session_id(session: &SessionSnapshot) -> String {
session
.native_session_id
@ -5594,6 +5805,11 @@ pub(crate) fn is_question_tool_action(action: &str) -> bool {
)
}
fn is_file_change_action(action: &str) -> bool {
matches!(action, "fileChange" | "file_change" | "file-change")
|| action.eq_ignore_ascii_case("filechange")
}
fn permission_cache_keys(action: &str, metadata: &Option<Value>) -> Vec<String> {
let mut keys = Vec::new();
push_permission_cache_key(&mut keys, action);
@ -6187,6 +6403,51 @@ fn codex_rpc_error_to_universal(error: &codex_schema::JsonrpcError) -> EventConv
EventConversion::new(UniversalEventType::Error, UniversalEventData::Error(data))
}
fn codex_permission_response_line(
permission_id: &str,
pending: &PendingPermission,
reply: PermissionReply,
) -> Result<String, SandboxError> {
let metadata = pending.metadata.clone().unwrap_or(Value::Null);
let request_id = codex_request_id_from_metadata(&metadata)
.or_else(|| codex_request_id_from_string(permission_id))
.ok_or_else(|| SandboxError::InvalidRequest {
message: "invalid codex permission request id".to_string(),
})?;
let request_kind = metadata
.get("codexRequestKind")
.and_then(Value::as_str)
.unwrap_or("");
let response_value = match request_kind {
"commandExecution" => {
let decision = codex_command_decision_for_reply(reply);
let response = codex_schema::CommandExecutionRequestApprovalResponse { decision };
serde_json::to_value(response).map_err(|err| SandboxError::InvalidRequest {
message: err.to_string(),
})?
}
"fileChange" => {
let decision = codex_file_change_decision_for_reply(reply);
let response = codex_schema::FileChangeRequestApprovalResponse { decision };
serde_json::to_value(response).map_err(|err| SandboxError::InvalidRequest {
message: err.to_string(),
})?
}
_ => {
return Err(SandboxError::InvalidRequest {
message: "unsupported codex permission request".to_string(),
});
}
};
let response = codex_schema::JsonrpcResponse {
id: request_id,
result: response_value,
};
serde_json::to_string(&response).map_err(|err| SandboxError::InvalidRequest {
message: err.to_string(),
})
}
fn codex_request_id_from_metadata(metadata: &Value) -> Option<codex_schema::RequestId> {
let metadata = metadata.as_object()?;
let value = metadata.get("codexRequestId")?;
@ -6704,13 +6965,13 @@ fn mock_command_conversions(prefix: &str, input: &str) -> Vec<EventConversion> {
return vec![];
}
let mut events = mock_command_events(prefix, trimmed);
if should_append_turn_completed(&events) {
events.push(turn_completed_event());
if should_append_turn_ended(&events) {
events.push(turn_ended_event(None, None).synthetic());
}
events
}
fn should_append_turn_completed(events: &[EventConversion]) -> bool {
fn should_append_turn_ended(events: &[EventConversion]) -> bool {
let Some(last) = events.last() else {
return false;
};
@ -7559,34 +7820,16 @@ fn stream_turn_events(
fn is_turn_terminal(event: &UniversalEvent, _agent: AgentId) -> bool {
match event.event_type {
UniversalEventType::SessionEnded
UniversalEventType::TurnEnded
| UniversalEventType::SessionEnded
| UniversalEventType::Error
| UniversalEventType::AgentUnparsed
| UniversalEventType::PermissionRequested
| UniversalEventType::QuestionRequested => true,
UniversalEventType::ItemCompleted => {
let UniversalEventData::Item(ItemEventData { item }) = &event.data else {
return false;
};
matches!(status_label(item), Some("turn.completed" | "session.idle"))
}
_ => false,
}
}
fn status_label(item: &UniversalItem) -> Option<&str> {
if item.kind != ItemKind::Status {
return None;
}
item.content.iter().find_map(|part| {
if let ContentPart::Status { label, .. } = part {
Some(label.as_str())
} else {
None
}
})
}
fn to_sse_event(event: UniversalEvent) -> Event {
Event::default()
.json_data(&event)

View file

@ -1048,6 +1048,13 @@ async fn run_turn_stream_check(app: &Router, config: &TestAgentConfig) {
create_session(app, config.agent, &session_id, test_permission_mode(config.agent)).await;
let events = read_turn_stream_events(app, &session_id, Duration::from_secs(120)).await;
assert!(
events
.iter()
.any(|event| event.get("type").and_then(Value::as_str) == Some("turn.ended")),
"turn stream did not include turn.ended for {}",
config.agent
);
let events = truncate_after_first_stop(&events);
assert!(
!events.is_empty(),

View file

@ -17,6 +17,25 @@ describe("OpenCode-compatible Event Streaming", () => {
let handle: SandboxAgentHandle;
let client: OpencodeClient;
function uniqueSessionId(prefix: string): string {
return `${prefix}-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
}
async function initSessionViaHttp(
sessionId: string,
body: Record<string, unknown>
): Promise<void> {
const response = await fetch(`${handle.baseUrl}/opencode/session/${sessionId}/init`, {
method: "POST",
headers: {
Authorization: `Bearer ${handle.token}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
});
expect(response.ok).toBe(true);
}
beforeAll(async () => {
await buildSandboxAgent();
});
@ -144,6 +163,129 @@ describe("OpenCode-compatible Event Streaming", () => {
expect(response.data).toBeDefined();
});
it("should be idle before first prompt and return to idle after prompt completion", async () => {
const sessionId = uniqueSessionId("status-idle");
await initSessionViaHttp(sessionId, { providerID: "mock", modelID: "mock" });
const initial = await client.session.status();
expect(initial.data?.[sessionId]?.type).toBe("idle");
const eventStream = await client.event.subscribe();
const statuses: string[] = [];
const collectIdle = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(
() => reject(new Error("Timed out waiting for session.idle")),
15_000
);
(async () => {
try {
for await (const event of (eventStream as any).stream) {
if (event?.properties?.sessionID !== sessionId) continue;
if (event.type === "session.status") {
const statusType = event?.properties?.status?.type;
if (typeof statusType === "string") statuses.push(statusType);
}
if (event.type === "session.idle") {
clearTimeout(timeout);
resolve();
break;
}
}
} catch {
// Stream ended
}
})();
});
await client.session.prompt({
path: { id: sessionId },
body: {
model: { providerID: "mock", modelID: "mock" },
parts: [{ type: "text", text: "Say hello" }],
},
});
await collectIdle;
expect(statuses).toContain("busy");
const finalStatus = await client.session.status();
expect(finalStatus.data?.[sessionId]?.type).toBe("idle");
});
it("should emit session.error and return idle for failed turns", async () => {
const sessionId = uniqueSessionId("status-error");
await initSessionViaHttp(sessionId, { providerID: "mock", modelID: "mock" });
const eventStream = await client.event.subscribe();
const errors: any[] = [];
const idles: any[] = [];
const collectErrorAndIdle = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(
() => reject(new Error("Timed out waiting for session.error + session.idle")),
15_000
);
(async () => {
try {
for await (const event of (eventStream as any).stream) {
if (event?.properties?.sessionID !== sessionId) continue;
if (event.type === "session.error") {
errors.push(event);
}
if (event.type === "session.idle") {
idles.push(event);
}
if (errors.length > 0 && idles.length > 0) {
clearTimeout(timeout);
resolve();
break;
}
}
} catch {
// Stream ended
}
})();
});
await client.session.prompt({
path: { id: sessionId },
body: {
model: { providerID: "mock", modelID: "mock" },
parts: [{ type: "text", text: "error" }],
},
});
await collectErrorAndIdle;
expect(errors.length).toBeGreaterThan(0);
const finalStatus = await client.session.status();
expect(finalStatus.data?.[sessionId]?.type).toBe("idle");
});
it("should report idle for newly initialized sessions across connected providers", async () => {
const providersResponse = await fetch(`${handle.baseUrl}/opencode/provider`, {
headers: { Authorization: `Bearer ${handle.token}` },
});
expect(providersResponse.ok).toBe(true);
const providersData = await providersResponse.json();
const connected: string[] = providersData.connected ?? [];
const defaults: Record<string, string> = providersData.default ?? {};
for (const providerID of connected) {
const modelID = defaults[providerID];
if (!modelID) continue;
const sessionId = uniqueSessionId(`status-${providerID.replace(/[^a-zA-Z0-9_-]/g, "_")}`);
await initSessionViaHttp(sessionId, { providerID, modelID });
const status = await client.session.status();
expect(status.data?.[sessionId]?.type).toBe("idle");
}
});
});
describe("session.idle count", () => {

View file

@ -43,6 +43,67 @@ describe("OpenCode-compatible Session API", () => {
return session?.permissionMode;
}
async function getBackingSession(sessionId: string) {
const response = await fetch(`${handle.baseUrl}/v1/sessions`, {
headers: { Authorization: `Bearer ${handle.token}` },
});
expect(response.ok).toBe(true);
const data = await response.json();
return (data.sessions ?? []).find((item: any) => item.sessionId === sessionId);
}
async function initSessionViaHttp(
sessionId: string,
body: Record<string, unknown> = {}
): Promise<{ response: Response; data: any }> {
const response = await fetch(`${handle.baseUrl}/opencode/session/${sessionId}/init`, {
method: "POST",
headers: {
Authorization: `Bearer ${handle.token}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
});
const data = await response.json();
return { response, data };
}
async function listMessagesViaHttp(sessionId: string): Promise<any[]> {
const response = await fetch(`${handle.baseUrl}/opencode/session/${sessionId}/message`, {
headers: { Authorization: `Bearer ${handle.token}` },
});
expect(response.ok).toBe(true);
return response.json();
}
async function getProvidersViaHttp(): Promise<{
connected: string[];
default: Record<string, string>;
}> {
const response = await fetch(`${handle.baseUrl}/opencode/provider`, {
headers: { Authorization: `Bearer ${handle.token}` },
});
expect(response.ok).toBe(true);
const data = await response.json();
return {
connected: data.connected ?? [],
default: data.default ?? {},
};
}
async function waitForAssistantMessage(sessionId: string, timeoutMs = 10_000): Promise<any> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const messages = await listMessagesViaHttp(sessionId);
const assistant = messages.find((message) => message?.info?.role === "assistant");
if (assistant) {
return assistant;
}
await new Promise((resolve) => setTimeout(resolve, 100));
}
throw new Error("Timed out waiting for assistant message");
}
beforeAll(async () => {
// Build the binary if needed
await buildSandboxAgent();
@ -145,6 +206,78 @@ describe("OpenCode-compatible Session API", () => {
});
});
describe("session.init", () => {
it("should accept empty init body and keep message flow working", async () => {
const session = await client.session.create();
const sessionId = session.data?.id!;
expect(sessionId).toBeDefined();
const initialized = await initSessionViaHttp(sessionId, {});
expect(initialized.response.ok).toBe(true);
expect(initialized.data).toBe(true);
const prompt = await client.session.prompt({
path: { id: sessionId },
body: {
parts: [{ type: "text", text: "hello after init" }],
} as any,
});
expect(prompt.error).toBeUndefined();
const assistant = await waitForAssistantMessage(sessionId);
expect(assistant?.info?.role).toBe("assistant");
});
it("should apply explicit init model selection to the backing session", async () => {
const session = await client.session.create();
const sessionId = session.data?.id!;
expect(sessionId).toBeDefined();
const initialized = await initSessionViaHttp(sessionId, {
providerID: "codex",
modelID: "gpt-5",
messageID: "msg_init",
});
expect(initialized.response.ok).toBe(true);
expect(initialized.data).toBe(true);
const backingSession = await getBackingSession(sessionId);
expect(backingSession?.agent).toBe("codex");
expect(backingSession?.model).toBe("gpt-5");
});
it("should accept first prompt after codex init without session-not-found", async () => {
const providers = await getProvidersViaHttp();
if (!providers.connected.includes("codex")) {
return;
}
const codexDefaultModel = providers.default?.codex;
if (!codexDefaultModel) {
return;
}
const session = await client.session.create();
const sessionId = session.data?.id!;
expect(sessionId).toBeDefined();
const initialized = await initSessionViaHttp(sessionId, {
providerID: "codex",
modelID: codexDefaultModel,
});
expect(initialized.response.ok).toBe(true);
expect(initialized.data).toBe(true);
const prompt = await client.session.prompt({
path: { id: sessionId },
body: {
model: { providerID: "codex", modelID: codexDefaultModel },
parts: [{ type: "text", text: "hello after codex init" }],
},
});
expect(prompt.error).toBeUndefined();
});
});
describe("session.get", () => {
it("should retrieve session by ID", async () => {
const created = await client.session.create({ body: { title: "Test" } });

View file

@ -82,6 +82,46 @@ async fn http_events_snapshots() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn accept_edits_noop_for_non_claude() {
let app = TestApp::new();
let session_id = "accept-edits-noop";
let (status, _) = send_json(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_id}"),
Some(json!({
"agent": AgentId::Mock.as_str(),
"permissionMode": "acceptEdits"
})),
)
.await;
assert_eq!(status, StatusCode::OK, "create session with acceptEdits");
let (status, sessions) = send_json(&app.app, Method::GET, "/v1/sessions", None).await;
assert_eq!(status, StatusCode::OK, "list sessions");
let sessions = sessions
.get("sessions")
.and_then(Value::as_array)
.expect("sessions list");
let session = sessions
.iter()
.find(|entry| {
entry
.get("sessionId")
.and_then(Value::as_str)
.is_some_and(|id| id == session_id)
})
.expect("created session");
let permission_mode = session
.get("permissionMode")
.and_then(Value::as_str)
.expect("permissionMode");
assert_eq!(permission_mode, "default");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sse_events_snapshots() {
let configs = test_agents_from_env().expect("configure SANDBOX_TEST_AGENTS or install agents");
@ -125,6 +165,11 @@ async fn turn_stream_route() {
let configs = test_agents_from_env().expect("configure SANDBOX_TEST_AGENTS or install agents");
for config in &configs {
// OpenCode's embedded bun can hang while installing plugins, which blocks turn streaming.
// OpenCode turn behavior is covered by the dedicated opencode-compat suite.
if config.agent == AgentId::Opencode {
continue;
}
let app = TestApp::new();
let capabilities = fetch_capabilities(&app.app).await;
let caps = capabilities
@ -137,6 +182,34 @@ async fn turn_stream_route() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn turn_stream_emits_turn_lifecycle_for_mock() {
let app = TestApp::new();
install_agent(&app.app, AgentId::Mock).await;
let session_id = "turn-lifecycle-mock";
create_session(
&app.app,
AgentId::Mock,
session_id,
test_permission_mode(AgentId::Mock),
)
.await;
let events = read_turn_stream_events(&app.app, session_id, Duration::from_secs(30)).await;
let started_count = events
.iter()
.filter(|event| event.get("type").and_then(Value::as_str) == Some("turn.started"))
.count();
let ended_count = events
.iter()
.filter(|event| event.get("type").and_then(Value::as_str) == Some("turn.ended"))
.count();
assert_eq!(started_count, 1, "expected exactly one turn.started event");
assert_eq!(ended_count, 1, "expected exactly one turn.ended event");
}
async fn run_concurrency_snapshot(app: &Router, config: &TestAgentConfig) {
let _guard = apply_credentials(&config.credentials);
install_agent(app, config.agent).await;

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/sessions/multi_turn.rs
assertion_line: 15
expression: value
---
first:
@ -15,19 +16,13 @@ first:
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 4
seq: 3
type: item.completed
- item:
content_types:
@ -35,13 +30,13 @@ first:
kind: message
role: assistant
status: in_progress
seq: 5
seq: 4
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 6
seq: 5
type: item.delta
- item:
content_types:
@ -49,7 +44,7 @@ first:
kind: message
role: assistant
status: completed
seq: 7
seq: 6
type: item.completed
second:
- item:
@ -60,19 +55,13 @@ second:
status: in_progress
seq: 1
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 2
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 3
seq: 2
type: item.completed
- item:
content_types:
@ -80,13 +69,13 @@ second:
kind: message
role: assistant
status: in_progress
seq: 4
seq: 3
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 5
seq: 4
type: item.delta
- item:
content_types:
@ -94,5 +83,5 @@ second:
kind: message
role: assistant
status: completed
seq: 6
seq: 5
type: item.completed

View file

@ -8,20 +8,16 @@ first:
seq: 1
session: started
type: session.started
- seq: 2
type: turn.started
- item:
content_types:
- text
kind: message
role: user
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
type: item.started
- item:
content_types:
- text
@ -69,47 +65,13 @@ first:
seq: 10
type: item.delta
second:
- seq: 1
type: turn.started
- item:
content_types:
- text
kind: message
role: user
status: in_progress
seq: 1
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
role: assistant
status: completed
seq: 2
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 3
type: item.completed
- item:
content_types:
- text
kind: message
role: assistant
status: in_progress
seq: 4
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 5
type: item.delta
- item:
content_types:
- text
kind: message
role: assistant
status: completed
seq: 6
type: item.completed

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/sessions/permissions.rs
assertion_line: 12
expression: value
---
- metadata: true
@ -14,23 +15,17 @@ expression: value
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 4
seq: 3
type: item.completed
- permission:
action: command_execution
id: "<redacted>"
status: requested
seq: 5
seq: 4
type: permission.requested

View file

@ -7,20 +7,16 @@ expression: value
seq: 1
session: started
type: session.started
- seq: 2
type: turn.started
- item:
content_types:
- text
kind: message
role: user
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
type: item.started
- item:
content_types:
- text
@ -61,3 +57,9 @@ expression: value
native_item_id: "<redacted>"
seq: 9
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 10
type: item.delta

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/sessions/questions.rs
assertion_line: 12
expression: value
---
- metadata: true
@ -14,23 +15,17 @@ expression: value
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 4
seq: 3
type: item.completed
- question:
id: "<redacted>"
options: 2
status: requested
seq: 5
seq: 4
type: question.requested

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/sessions/questions.rs
assertion_line: 12
expression: value
---
- metadata: true
@ -14,23 +15,17 @@ expression: value
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 4
seq: 3
type: item.completed
- question:
id: "<redacted>"
options: 2
status: requested
seq: 5
seq: 4
type: question.requested

View file

@ -7,20 +7,16 @@ expression: value
seq: 1
session: started
type: session.started
- seq: 2
type: turn.started
- item:
content_types:
- text
kind: message
role: user
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
type: item.started
- item:
content_types:
- text
@ -43,95 +39,11 @@ expression: value
native_item_id: "<redacted>"
seq: 6
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 7
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 8
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 9
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 10
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 11
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 12
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 13
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 14
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 15
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 16
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 17
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 18
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 19
type: item.delta
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 20
type: item.delta
- item:
content_types:
- text
kind: message
role: assistant
status: completed
seq: 21
seq: 7
type: item.completed

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/sessions/session_lifecycle.rs
assertion_line: 12
expression: value
---
session_a:
@ -15,19 +16,13 @@ session_a:
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 4
seq: 3
type: item.completed
- item:
content_types:
@ -35,13 +30,13 @@ session_a:
kind: message
role: assistant
status: in_progress
seq: 5
seq: 4
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 6
seq: 5
type: item.delta
- item:
content_types:
@ -49,7 +44,7 @@ session_a:
kind: message
role: assistant
status: completed
seq: 7
seq: 6
type: item.completed
session_b:
- metadata: true
@ -64,19 +59,13 @@ session_b:
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 4
seq: 3
type: item.completed
- item:
content_types:
@ -84,13 +73,13 @@ session_b:
kind: message
role: assistant
status: in_progress
seq: 5
seq: 4
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 6
seq: 5
type: item.delta
- item:
content_types:
@ -98,5 +87,5 @@ session_b:
kind: message
role: assistant
status: completed
seq: 7
seq: 6
type: item.completed

View file

@ -8,20 +8,16 @@ session_a:
seq: 1
session: started
type: session.started
- seq: 2
type: turn.started
- item:
content_types:
- text
kind: message
role: user
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
type: item.started
- item:
content_types:
- text
@ -49,20 +45,16 @@ session_b:
seq: 1
session: started
type: session.started
- seq: 2
type: turn.started
- item:
content_types:
- text
kind: message
role: user
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
type: item.started
- item:
content_types:
- text

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/sessions/../common/http.rs
assertion_line: 1001
expression: normalized
---
- metadata: true
@ -14,19 +15,13 @@ expression: normalized
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 4
seq: 3
type: item.completed
- item:
content_types:
@ -34,13 +29,13 @@ expression: normalized
kind: message
role: assistant
status: in_progress
seq: 5
seq: 4
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 6
seq: 5
type: item.delta
- item:
content_types:
@ -48,5 +43,5 @@ expression: normalized
kind: message
role: assistant
status: completed
seq: 7
seq: 6
type: item.completed

View file

@ -7,20 +7,16 @@ expression: normalized
seq: 1
session: started
type: session.started
- seq: 2
type: turn.started
- item:
content_types:
- text
kind: message
role: user
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
type: item.started
- item:
content_types:
- text

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/sessions/../common/http.rs
assertion_line: 1039
expression: normalized
---
- metadata: true
@ -14,19 +15,13 @@ expression: normalized
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
- item:
content_types:
- text
kind: message
role: user
status: completed
seq: 4
seq: 3
type: item.completed
- item:
content_types:
@ -34,13 +29,13 @@ expression: normalized
kind: message
role: assistant
status: in_progress
seq: 5
seq: 4
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 6
seq: 5
type: item.delta
- item:
content_types:
@ -48,5 +43,5 @@ expression: normalized
kind: message
role: assistant
status: completed
seq: 7
seq: 6
type: item.completed

View file

@ -7,20 +7,16 @@ expression: normalized
seq: 1
session: started
type: session.started
- seq: 2
type: turn.started
- item:
content_types:
- text
kind: message
role: user
status: in_progress
seq: 2
type: item.started
- delta:
delta: "<redacted>"
item_id: "<redacted>"
native_item_id: "<redacted>"
seq: 3
type: item.delta
type: item.started
- item:
content_types:
- text

View file

@ -4,7 +4,7 @@ use serde_json::Value;
use crate::amp as schema;
use crate::{
turn_completed_event, ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData,
turn_ended_event, ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData,
ItemKind, ItemRole, ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy,
UniversalEventData, UniversalEventType, UniversalItem,
};
@ -99,7 +99,7 @@ pub fn event_to_universal(
));
}
schema::StreamJsonMessageType::Done => {
events.push(turn_completed_event());
events.push(turn_ended_event(None, None).synthetic());
events.push(
EventConversion::new(
UniversalEventType::SessionEnded,

View file

@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use serde_json::Value;
use crate::{
turn_completed_event, ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind,
turn_ended_event, ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind,
ItemRole, ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus,
SessionStartedData, UniversalEventData, UniversalEventType, UniversalItem,
};
@ -425,7 +425,7 @@ fn result_event_to_universal(event: &Value, session_id: &str) -> Vec<EventConver
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item: message_item }),
),
turn_completed_event(),
turn_ended_event(None, None).synthetic(),
]
}

View file

@ -4,7 +4,7 @@ use crate::codex as schema;
use crate::{
ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole,
ItemStatus, ReasoningVisibility, SessionEndReason, SessionEndedData, SessionStartedData,
TerminatedBy, UniversalEventData, UniversalEventType, UniversalItem,
TerminatedBy, TurnEventData, TurnPhase, UniversalEventData, UniversalEventType, UniversalItem,
};
/// Convert a Codex ServerNotification to universal events.
@ -36,18 +36,26 @@ pub fn notification_to_universal(
Some(params.thread_id.clone()),
raw,
)]),
schema::ServerNotification::TurnStarted(params) => Ok(vec![status_event(
"turn.started",
serde_json::to_string(&params.turn).ok(),
Some(params.thread_id.clone()),
raw,
)]),
schema::ServerNotification::TurnCompleted(params) => Ok(vec![status_event(
"turn.completed",
serde_json::to_string(&params.turn).ok(),
Some(params.thread_id.clone()),
raw,
)]),
schema::ServerNotification::TurnStarted(params) => Ok(vec![EventConversion::new(
UniversalEventType::TurnStarted,
UniversalEventData::Turn(TurnEventData {
phase: TurnPhase::Started,
turn_id: Some(params.turn.id.clone()),
metadata: serde_json::to_value(&params.turn).ok(),
}),
)
.with_native_session(Some(params.thread_id.clone()))
.with_raw(raw)]),
schema::ServerNotification::TurnCompleted(params) => Ok(vec![EventConversion::new(
UniversalEventType::TurnEnded,
UniversalEventData::Turn(TurnEventData {
phase: TurnPhase::Ended,
turn_id: Some(params.turn.id.clone()),
metadata: serde_json::to_value(&params.turn).ok(),
}),
)
.with_native_session(Some(params.thread_id.clone()))
.with_raw(raw)]),
schema::ServerNotification::TurnDiffUpdated(params) => Ok(vec![status_event(
"turn.diff.updated",
serde_json::to_string(params).ok(),

View file

@ -3,8 +3,9 @@ use serde_json::Value;
use crate::opencode as schema;
use crate::{
ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus,
PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, SessionStartedData,
UniversalEventData, UniversalEventType, UniversalItem,
PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility,
SessionStartedData, TurnEventData, TurnPhase, UniversalEventData, UniversalEventType,
UniversalItem,
};
pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>, String> {
@ -69,27 +70,37 @@ pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>,
);
}
schema::Part::ReasoningPart(reasoning_part) => {
let delta_text = delta
let reasoning_text = delta
.as_ref()
.cloned()
.unwrap_or_else(|| reasoning_part.text.clone());
let stub = stub_message_item(&message_id, ItemRole::Assistant);
let reasoning_id = reasoning_part.id.clone();
let mut started = stub_message_item(&reasoning_id, ItemRole::Assistant);
started.parent_id = Some(message_id.clone());
let completed = UniversalItem {
item_id: String::new(),
native_item_id: Some(reasoning_id),
parent_id: Some(message_id.clone()),
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content: vec![ContentPart::Reasoning {
text: reasoning_text,
visibility: ReasoningVisibility::Public,
}],
status: ItemStatus::Completed,
};
events.push(
EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item: stub }),
UniversalEventData::Item(ItemEventData { item: started }),
)
.synthetic()
.with_raw(raw.clone()),
);
events.push(
EventConversion::new(
UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(),
native_item_id: Some(message_id.clone()),
delta: delta_text,
}),
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item: completed }),
)
.with_native_session(session_id.clone())
.with_raw(raw.clone()),
@ -207,26 +218,59 @@ pub fn event_to_universal(event: &schema::Event) -> Result<Vec<EventConversion>,
properties,
type_: _,
} = status;
let status_type = serde_json::to_value(&properties.status)
.ok()
.and_then(|value| {
value
.get("type")
.and_then(Value::as_str)
.map(str::to_string)
});
let detail =
serde_json::to_string(&properties.status).unwrap_or_else(|_| "status".to_string());
let item = status_item("session.status", Some(detail));
let conversion = EventConversion::new(
let mut events = vec![EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
)
.with_native_session(Some(properties.session_id.clone()))
.with_raw(raw);
Ok(vec![conversion])
.with_raw(raw.clone())];
if matches!(status_type.as_deref(), Some("busy" | "idle")) {
let (event_type, phase) = if status_type.as_deref() == Some("busy") {
(UniversalEventType::TurnStarted, TurnPhase::Started)
} else {
(UniversalEventType::TurnEnded, TurnPhase::Ended)
};
events.push(
EventConversion::new(
event_type,
UniversalEventData::Turn(TurnEventData {
phase,
turn_id: None,
metadata: Some(
serde_json::to_value(&properties.status).unwrap_or(Value::Null),
),
}),
)
.with_native_session(Some(properties.session_id.clone()))
.with_raw(raw),
);
}
Ok(events)
}
schema::Event::SessionIdle(idle) => {
let schema::EventSessionIdle {
properties,
type_: _,
} = idle;
let item = status_item("session.idle", None);
let conversion = EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
UniversalEventType::TurnEnded,
UniversalEventData::Turn(TurnEventData {
phase: TurnPhase::Ended,
turn_id: None,
metadata: None,
}),
)
.with_native_session(Some(properties.session_id.clone()))
.with_raw(raw);
@ -528,3 +572,50 @@ fn permission_from_opencode(request: &schema::PermissionRequest) -> PermissionEv
metadata: serde_json::to_value(request).ok(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reasoning_part_updates_stay_typed_not_text_delta() {
let event = schema::Event::MessagePartUpdated(schema::EventMessagePartUpdated {
properties: schema::EventMessagePartUpdatedProperties {
delta: Some("Preparing friendly brief response".to_string()),
part: schema::Part::ReasoningPart(schema::ReasoningPart {
id: "part_reason_1".to_string(),
message_id: "msg_1".to_string(),
metadata: serde_json::Map::new(),
session_id: "ses_1".to_string(),
text: "Preparing".to_string(),
time: schema::ReasoningPartTime {
end: None,
start: 0.0,
},
type_: "reasoning".to_string(),
}),
},
type_: "message.part.updated".to_string(),
});
let converted = event_to_universal(&event).expect("conversion succeeds");
assert_eq!(converted.len(), 2);
assert!(converted
.iter()
.all(|entry| entry.event_type != UniversalEventType::ItemDelta));
let completed = converted
.iter()
.find(|entry| entry.event_type == UniversalEventType::ItemCompleted)
.expect("item.completed exists");
let UniversalEventData::Item(ItemEventData { item }) = &completed.data else {
panic!("expected item payload");
};
assert_eq!(item.native_item_id.as_deref(), Some("part_reason_1"));
assert!(matches!(
item.content.first(),
Some(ContentPart::Reasoning { text, .. })
if text == "Preparing friendly brief response"
));
}
}

View file

@ -40,6 +40,10 @@ pub enum UniversalEventType {
SessionStarted,
#[serde(rename = "session.ended")]
SessionEnded,
#[serde(rename = "turn.started")]
TurnStarted,
#[serde(rename = "turn.ended")]
TurnEnded,
#[serde(rename = "item.started")]
ItemStarted,
#[serde(rename = "item.delta")]
@ -63,6 +67,7 @@ pub enum UniversalEventType {
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(untagged)]
pub enum UniversalEventData {
Turn(TurnEventData),
SessionStarted(SessionStartedData),
SessionEnded(SessionEndedData),
Item(ItemEventData),
@ -93,6 +98,22 @@ pub struct SessionEndedData {
pub stderr: Option<StderrOutput>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
pub struct TurnEventData {
pub phase: TurnPhase,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub turn_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub metadata: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum TurnPhase {
Started,
Ended,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
pub struct StderrOutput {
/// First N lines of stderr (if truncated) or full stderr (if not truncated)
@ -318,25 +339,26 @@ impl EventConversion {
}
}
pub fn turn_completed_event() -> EventConversion {
pub fn turn_started_event(turn_id: Option<String>, metadata: Option<Value>) -> EventConversion {
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData {
item: UniversalItem {
item_id: String::new(),
native_item_id: None,
parent_id: None,
kind: ItemKind::Status,
role: Some(ItemRole::System),
content: vec![ContentPart::Status {
label: "turn.completed".to_string(),
detail: None,
}],
status: ItemStatus::Completed,
},
UniversalEventType::TurnStarted,
UniversalEventData::Turn(TurnEventData {
phase: TurnPhase::Started,
turn_id,
metadata,
}),
)
}
pub fn turn_ended_event(turn_id: Option<String>, metadata: Option<Value>) -> EventConversion {
EventConversion::new(
UniversalEventType::TurnEnded,
UniversalEventData::Turn(TurnEventData {
phase: TurnPhase::Ended,
turn_id,
metadata,
}),
)
.synthetic()
}
pub fn item_from_text(role: ItemRole, text: String) -> UniversalItem {