fix: consistent turn.completed signal and OpenCode session.idle timing

This commit is contained in:
Nathan Flurry 2026-02-06 01:38:49 -08:00
parent f54980d1da
commit 9ab27ad1a8
6 changed files with 171 additions and 35 deletions

View file

@ -1410,6 +1410,25 @@ async fn apply_universal_event(state: Arc<OpenCodeAppState>, event: UniversalEve
match event.event_type {
UniversalEventType::ItemStarted | UniversalEventType::ItemCompleted => {
if let UniversalEventData::Item(ItemEventData { item }) = &event.data {
// turn.completed or session.idle status → emit session.idle
if event.event_type == UniversalEventType::ItemCompleted
&& item.kind == ItemKind::Status
{
if let Some(ContentPart::Status { label, .. }) = item.content.first() {
if label == "turn.completed" || label == "session.idle" {
let session_id = event.session_id.clone();
state.opencode.emit_event(json!({
"type": "session.status",
"properties": {"sessionID": session_id, "status": {"type": "idle"}}
}));
state.opencode.emit_event(json!({
"type": "session.idle",
"properties": {"sessionID": session_id}
}));
return;
}
}
}
apply_item_event(state, event.clone(), item.clone()).await;
}
}
@ -1894,19 +1913,6 @@ async fn apply_item_event(
}
}
if event.event_type == UniversalEventType::ItemCompleted {
state.opencode.emit_event(json!({
"type": "session.status",
"properties": {
"sessionID": session_id,
"status": {"type": "idle"}
}
}));
state.opencode.emit_event(json!({
"type": "session.idle",
"properties": { "sessionID": session_id }
}));
}
}
async fn apply_tool_item_event(

View file

@ -22,8 +22,8 @@ use reqwest::Client;
use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError};
use sandbox_agent_universal_agent_schema::{
codex as codex_schema, convert_amp, convert_claude, convert_codex, convert_opencode,
AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource, FileAction,
ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData,
turn_completed_event, AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource,
FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData,
PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason,
SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent,
UniversalEventData, UniversalEventType, UniversalItem,
@ -6029,7 +6029,26 @@ fn mock_command_conversions(prefix: &str, input: &str) -> Vec<EventConversion> {
if trimmed.is_empty() {
return vec![];
}
let mut events = mock_command_events(prefix, trimmed);
if should_append_turn_completed(&events) {
events.push(turn_completed_event());
}
events
}
fn should_append_turn_completed(events: &[EventConversion]) -> bool {
let Some(last) = events.last() else {
return false;
};
!matches!(
last.event_type,
UniversalEventType::SessionEnded
| UniversalEventType::PermissionRequested
| UniversalEventType::QuestionRequested
)
}
fn mock_command_events(prefix: &str, trimmed: &str) -> Vec<EventConversion> {
if trimmed.eq_ignore_ascii_case(MOCK_OK_PROMPT) {
return mock_assistant_message(format!("{prefix}_ok"), "OK".to_string());
}
@ -6864,7 +6883,7 @@ fn stream_turn_events(
})
}
fn is_turn_terminal(event: &UniversalEvent, agent: AgentId) -> bool {
fn is_turn_terminal(event: &UniversalEvent, _agent: AgentId) -> bool {
match event.event_type {
UniversalEventType::SessionEnded
| UniversalEventType::Error
@ -6875,15 +6894,7 @@ fn is_turn_terminal(event: &UniversalEvent, agent: AgentId) -> bool {
let UniversalEventData::Item(ItemEventData { item }) = &event.data else {
return false;
};
if let Some(label) = status_label(item) {
if label == "turn.completed" || label == "session.idle" {
return true;
}
}
if matches!(item.role, Some(ItemRole::Assistant)) && item.kind == ItemKind::Message {
return agent != AgentId::Codex;
}
false
matches!(status_label(item), Some("turn.completed" | "session.idle"))
}
_ => false,
}

View file

@ -145,4 +145,98 @@ describe("OpenCode-compatible Event Streaming", () => {
expect(response.data).toBeDefined();
});
});
describe("session.idle count", () => {
it("should emit exactly one session.idle for echo flow", async () => {
const session = await client.session.create();
const sessionId = session.data?.id!;
const eventStream = await client.event.subscribe();
const idleEvents: any[] = [];
// Wait for first idle, then linger 1s for duplicates
const collectIdle = new Promise<void>((resolve, reject) => {
let lingerTimer: ReturnType<typeof setTimeout> | null = null;
const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000);
(async () => {
try {
for await (const event of (eventStream as any).stream) {
if (event.type === "session.idle") {
idleEvents.push(event);
if (!lingerTimer) {
lingerTimer = setTimeout(() => {
clearTimeout(timeout);
resolve();
}, 1000);
}
}
}
} catch {
// Stream ended
}
})();
});
await client.session.prompt({
path: { id: sessionId },
body: {
model: { providerID: "sandbox-agent", modelID: "mock" },
parts: [{ type: "text", text: "echo hello" }],
},
});
await collectIdle;
expect(idleEvents.length).toBe(1);
});
it("should emit exactly one session.idle for tool flow", async () => {
const session = await client.session.create();
const sessionId = session.data?.id!;
const eventStream = await client.event.subscribe();
const allEvents: any[] = [];
const idleEvents: any[] = [];
const collectIdle = new Promise<void>((resolve, reject) => {
let lingerTimer: ReturnType<typeof setTimeout> | null = null;
const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000);
(async () => {
try {
for await (const event of (eventStream as any).stream) {
allEvents.push(event);
if (event.type === "session.idle") {
idleEvents.push(event);
if (!lingerTimer) {
lingerTimer = setTimeout(() => {
clearTimeout(timeout);
resolve();
}, 1000);
}
}
}
} catch {
// Stream ended
}
})();
});
await client.session.prompt({
path: { id: sessionId },
body: {
model: { providerID: "sandbox-agent", modelID: "mock" },
parts: [{ type: "text", text: "tool" }],
},
});
await collectIdle;
expect(idleEvents.length).toBe(1);
// All tool parts should have been emitted before idle
const toolParts = allEvents.filter(
(e) => e.type === "message.part.updated" && e.properties?.part?.type === "tool"
);
expect(toolParts.length).toBeGreaterThan(0);
});
});
});

View file

@ -4,9 +4,9 @@ use serde_json::Value;
use crate::amp as schema;
use crate::{
ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole,
ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy, UniversalEventData,
UniversalEventType, UniversalItem,
turn_completed_event, ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData,
ItemKind, ItemRole, ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy,
UniversalEventData, UniversalEventType, UniversalItem,
};
static TEMP_ID: AtomicU64 = AtomicU64::new(1);
@ -99,6 +99,7 @@ pub fn event_to_universal(
));
}
schema::StreamJsonMessageType::Done => {
events.push(turn_completed_event());
events.push(
EventConversion::new(
UniversalEventType::SessionEnded,

View file

@ -3,9 +3,9 @@ use std::sync::atomic::{AtomicU64, Ordering};
use serde_json::Value;
use crate::{
ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus,
PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, SessionStartedData,
UniversalEventData, UniversalEventType, UniversalItem,
turn_completed_event, ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind,
ItemRole, ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus,
SessionStartedData, UniversalEventData, UniversalEventType, UniversalItem,
};
static TEMP_ID: AtomicU64 = AtomicU64::new(1);
@ -420,10 +420,13 @@ fn result_event_to_universal(event: &Value, session_id: &str) -> Vec<EventConver
status: ItemStatus::Completed,
};
vec![EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item: message_item }),
)]
vec![
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item: message_item }),
),
turn_completed_event(),
]
}
fn claude_message_id(event: &Value, session_id: &str) -> String {

View file

@ -317,6 +317,27 @@ impl EventConversion {
}
}
pub fn turn_completed_event() -> EventConversion {
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData {
item: UniversalItem {
item_id: String::new(),
native_item_id: None,
parent_id: None,
kind: ItemKind::Status,
role: Some(ItemRole::System),
content: vec![ContentPart::Status {
label: "turn.completed".to_string(),
detail: None,
}],
status: ItemStatus::Completed,
},
}),
)
.synthetic()
}
pub fn item_from_text(role: ItemRole, text: String) -> UniversalItem {
UniversalItem {
item_id: String::new(),