From 9ab27ad1a8c08d96d231158968be0a46b92ac9e1 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 6 Feb 2026 01:38:49 -0800 Subject: [PATCH] fix: consistent turn.completed signal and OpenCode session.idle timing --- .../sandbox-agent/src/opencode_compat.rs | 32 ++++--- server/packages/sandbox-agent/src/router.rs | 35 ++++--- .../tests/opencode-compat/events.test.ts | 94 +++++++++++++++++++ .../universal-agent-schema/src/agents/amp.rs | 7 +- .../src/agents/claude.rs | 17 ++-- .../universal-agent-schema/src/lib.rs | 21 +++++ 6 files changed, 171 insertions(+), 35 deletions(-) diff --git a/server/packages/sandbox-agent/src/opencode_compat.rs b/server/packages/sandbox-agent/src/opencode_compat.rs index 982cdcc..d91d26c 100644 --- a/server/packages/sandbox-agent/src/opencode_compat.rs +++ b/server/packages/sandbox-agent/src/opencode_compat.rs @@ -1410,6 +1410,25 @@ async fn apply_universal_event(state: Arc, event: UniversalEve match event.event_type { UniversalEventType::ItemStarted | UniversalEventType::ItemCompleted => { if let UniversalEventData::Item(ItemEventData { item }) = &event.data { + // turn.completed or session.idle status → emit session.idle + if event.event_type == UniversalEventType::ItemCompleted + && item.kind == ItemKind::Status + { + if let Some(ContentPart::Status { label, .. }) = item.content.first() { + if label == "turn.completed" || label == "session.idle" { + let session_id = event.session_id.clone(); + state.opencode.emit_event(json!({ + "type": "session.status", + "properties": {"sessionID": session_id, "status": {"type": "idle"}} + })); + state.opencode.emit_event(json!({ + "type": "session.idle", + "properties": {"sessionID": session_id} + })); + return; + } + } + } apply_item_event(state, event.clone(), item.clone()).await; } } @@ -1894,19 +1913,6 @@ async fn apply_item_event( } } - if event.event_type == UniversalEventType::ItemCompleted { - state.opencode.emit_event(json!({ - "type": "session.status", - "properties": { - "sessionID": session_id, - "status": {"type": "idle"} - } - })); - state.opencode.emit_event(json!({ - "type": "session.idle", - "properties": { "sessionID": session_id } - })); - } } async fn apply_tool_item_event( diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index 8ac6e25..3746240 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -22,8 +22,8 @@ use reqwest::Client; use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError}; use sandbox_agent_universal_agent_schema::{ codex as codex_schema, convert_amp, convert_claude, convert_codex, convert_opencode, - AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource, FileAction, - ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData, + turn_completed_event, AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource, + FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason, SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent, UniversalEventData, UniversalEventType, UniversalItem, @@ -6029,7 +6029,26 @@ fn mock_command_conversions(prefix: &str, input: &str) -> Vec { if trimmed.is_empty() { return vec![]; } + let mut events = mock_command_events(prefix, trimmed); + if should_append_turn_completed(&events) { + events.push(turn_completed_event()); + } + events +} +fn should_append_turn_completed(events: &[EventConversion]) -> bool { + let Some(last) = events.last() else { + return false; + }; + !matches!( + last.event_type, + UniversalEventType::SessionEnded + | UniversalEventType::PermissionRequested + | UniversalEventType::QuestionRequested + ) +} + +fn mock_command_events(prefix: &str, trimmed: &str) -> Vec { if trimmed.eq_ignore_ascii_case(MOCK_OK_PROMPT) { return mock_assistant_message(format!("{prefix}_ok"), "OK".to_string()); } @@ -6864,7 +6883,7 @@ fn stream_turn_events( }) } -fn is_turn_terminal(event: &UniversalEvent, agent: AgentId) -> bool { +fn is_turn_terminal(event: &UniversalEvent, _agent: AgentId) -> bool { match event.event_type { UniversalEventType::SessionEnded | UniversalEventType::Error @@ -6875,15 +6894,7 @@ fn is_turn_terminal(event: &UniversalEvent, agent: AgentId) -> bool { let UniversalEventData::Item(ItemEventData { item }) = &event.data else { return false; }; - if let Some(label) = status_label(item) { - if label == "turn.completed" || label == "session.idle" { - return true; - } - } - if matches!(item.role, Some(ItemRole::Assistant)) && item.kind == ItemKind::Message { - return agent != AgentId::Codex; - } - false + matches!(status_label(item), Some("turn.completed" | "session.idle")) } _ => false, } diff --git a/server/packages/sandbox-agent/tests/opencode-compat/events.test.ts b/server/packages/sandbox-agent/tests/opencode-compat/events.test.ts index 587ebf3..61ab1f3 100644 --- a/server/packages/sandbox-agent/tests/opencode-compat/events.test.ts +++ b/server/packages/sandbox-agent/tests/opencode-compat/events.test.ts @@ -145,4 +145,98 @@ describe("OpenCode-compatible Event Streaming", () => { expect(response.data).toBeDefined(); }); }); + + describe("session.idle count", () => { + it("should emit exactly one session.idle for echo flow", async () => { + const session = await client.session.create(); + const sessionId = session.data?.id!; + + const eventStream = await client.event.subscribe(); + const idleEvents: any[] = []; + + // Wait for first idle, then linger 1s for duplicates + const collectIdle = new Promise((resolve, reject) => { + let lingerTimer: ReturnType | null = null; + const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000); + (async () => { + try { + for await (const event of (eventStream as any).stream) { + if (event.type === "session.idle") { + idleEvents.push(event); + if (!lingerTimer) { + lingerTimer = setTimeout(() => { + clearTimeout(timeout); + resolve(); + }, 1000); + } + } + } + } catch { + // Stream ended + } + })(); + }); + + await client.session.prompt({ + path: { id: sessionId }, + body: { + model: { providerID: "sandbox-agent", modelID: "mock" }, + parts: [{ type: "text", text: "echo hello" }], + }, + }); + + await collectIdle; + expect(idleEvents.length).toBe(1); + }); + + it("should emit exactly one session.idle for tool flow", async () => { + const session = await client.session.create(); + const sessionId = session.data?.id!; + + const eventStream = await client.event.subscribe(); + const allEvents: any[] = []; + const idleEvents: any[] = []; + + const collectIdle = new Promise((resolve, reject) => { + let lingerTimer: ReturnType | null = null; + const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000); + (async () => { + try { + for await (const event of (eventStream as any).stream) { + allEvents.push(event); + if (event.type === "session.idle") { + idleEvents.push(event); + if (!lingerTimer) { + lingerTimer = setTimeout(() => { + clearTimeout(timeout); + resolve(); + }, 1000); + } + } + } + } catch { + // Stream ended + } + })(); + }); + + await client.session.prompt({ + path: { id: sessionId }, + body: { + model: { providerID: "sandbox-agent", modelID: "mock" }, + parts: [{ type: "text", text: "tool" }], + }, + }); + + await collectIdle; + + expect(idleEvents.length).toBe(1); + + // All tool parts should have been emitted before idle + const toolParts = allEvents.filter( + (e) => e.type === "message.part.updated" && e.properties?.part?.type === "tool" + ); + expect(toolParts.length).toBeGreaterThan(0); + }); + }); }); diff --git a/server/packages/universal-agent-schema/src/agents/amp.rs b/server/packages/universal-agent-schema/src/agents/amp.rs index 75326fc..7134896 100644 --- a/server/packages/universal-agent-schema/src/agents/amp.rs +++ b/server/packages/universal-agent-schema/src/agents/amp.rs @@ -4,9 +4,9 @@ use serde_json::Value; use crate::amp as schema; use crate::{ - ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, - ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy, UniversalEventData, - UniversalEventType, UniversalItem, + turn_completed_event, ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData, + ItemKind, ItemRole, ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy, + UniversalEventData, UniversalEventType, UniversalItem, }; static TEMP_ID: AtomicU64 = AtomicU64::new(1); @@ -99,6 +99,7 @@ pub fn event_to_universal( )); } schema::StreamJsonMessageType::Done => { + events.push(turn_completed_event()); events.push( EventConversion::new( UniversalEventType::SessionEnded, diff --git a/server/packages/universal-agent-schema/src/agents/claude.rs b/server/packages/universal-agent-schema/src/agents/claude.rs index 5e5c7bc..94ff081 100644 --- a/server/packages/universal-agent-schema/src/agents/claude.rs +++ b/server/packages/universal-agent-schema/src/agents/claude.rs @@ -3,9 +3,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use serde_json::Value; use crate::{ - ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, - PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, SessionStartedData, - UniversalEventData, UniversalEventType, UniversalItem, + turn_completed_event, ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, + ItemRole, ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, + SessionStartedData, UniversalEventData, UniversalEventType, UniversalItem, }; static TEMP_ID: AtomicU64 = AtomicU64::new(1); @@ -420,10 +420,13 @@ fn result_event_to_universal(event: &Value, session_id: &str) -> Vec String { diff --git a/server/packages/universal-agent-schema/src/lib.rs b/server/packages/universal-agent-schema/src/lib.rs index f4735f0..d30d93f 100644 --- a/server/packages/universal-agent-schema/src/lib.rs +++ b/server/packages/universal-agent-schema/src/lib.rs @@ -317,6 +317,27 @@ impl EventConversion { } } +pub fn turn_completed_event() -> EventConversion { + EventConversion::new( + UniversalEventType::ItemCompleted, + UniversalEventData::Item(ItemEventData { + item: UniversalItem { + item_id: String::new(), + native_item_id: None, + parent_id: None, + kind: ItemKind::Status, + role: Some(ItemRole::System), + content: vec![ContentPart::Status { + label: "turn.completed".to_string(), + detail: None, + }], + status: ItemStatus::Completed, + }, + }), + ) + .synthetic() +} + pub fn item_from_text(role: ItemRole, text: String) -> UniversalItem { UniversalItem { item_id: String::new(),