mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-15 12:03:53 +00:00
fix: consistent turn.completed signal and OpenCode session.idle timing
This commit is contained in:
parent
b74539172b
commit
02b248c99f
6 changed files with 171 additions and 35 deletions
|
|
@ -1410,6 +1410,25 @@ async fn apply_universal_event(state: Arc<OpenCodeAppState>, event: UniversalEve
|
|||
match event.event_type {
|
||||
UniversalEventType::ItemStarted | UniversalEventType::ItemCompleted => {
|
||||
if let UniversalEventData::Item(ItemEventData { item }) = &event.data {
|
||||
// turn.completed or session.idle status → emit session.idle
|
||||
if event.event_type == UniversalEventType::ItemCompleted
|
||||
&& item.kind == ItemKind::Status
|
||||
{
|
||||
if let Some(ContentPart::Status { label, .. }) = item.content.first() {
|
||||
if label == "turn.completed" || label == "session.idle" {
|
||||
let session_id = event.session_id.clone();
|
||||
state.opencode.emit_event(json!({
|
||||
"type": "session.status",
|
||||
"properties": {"sessionID": session_id, "status": {"type": "idle"}}
|
||||
}));
|
||||
state.opencode.emit_event(json!({
|
||||
"type": "session.idle",
|
||||
"properties": {"sessionID": session_id}
|
||||
}));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
apply_item_event(state, event.clone(), item.clone()).await;
|
||||
}
|
||||
}
|
||||
|
|
@ -1894,19 +1913,6 @@ async fn apply_item_event(
|
|||
}
|
||||
}
|
||||
|
||||
if event.event_type == UniversalEventType::ItemCompleted {
|
||||
state.opencode.emit_event(json!({
|
||||
"type": "session.status",
|
||||
"properties": {
|
||||
"sessionID": session_id,
|
||||
"status": {"type": "idle"}
|
||||
}
|
||||
}));
|
||||
state.opencode.emit_event(json!({
|
||||
"type": "session.idle",
|
||||
"properties": { "sessionID": session_id }
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
async fn apply_tool_item_event(
|
||||
|
|
|
|||
|
|
@ -22,8 +22,8 @@ use reqwest::Client;
|
|||
use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError};
|
||||
use sandbox_agent_universal_agent_schema::{
|
||||
codex as codex_schema, convert_amp, convert_claude, convert_codex, convert_opencode,
|
||||
AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource, FileAction,
|
||||
ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData,
|
||||
turn_completed_event, AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource,
|
||||
FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData,
|
||||
PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason,
|
||||
SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent,
|
||||
UniversalEventData, UniversalEventType, UniversalItem,
|
||||
|
|
@ -6029,7 +6029,26 @@ fn mock_command_conversions(prefix: &str, input: &str) -> Vec<EventConversion> {
|
|||
if trimmed.is_empty() {
|
||||
return vec![];
|
||||
}
|
||||
let mut events = mock_command_events(prefix, trimmed);
|
||||
if should_append_turn_completed(&events) {
|
||||
events.push(turn_completed_event());
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
fn should_append_turn_completed(events: &[EventConversion]) -> bool {
|
||||
let Some(last) = events.last() else {
|
||||
return false;
|
||||
};
|
||||
!matches!(
|
||||
last.event_type,
|
||||
UniversalEventType::SessionEnded
|
||||
| UniversalEventType::PermissionRequested
|
||||
| UniversalEventType::QuestionRequested
|
||||
)
|
||||
}
|
||||
|
||||
fn mock_command_events(prefix: &str, trimmed: &str) -> Vec<EventConversion> {
|
||||
if trimmed.eq_ignore_ascii_case(MOCK_OK_PROMPT) {
|
||||
return mock_assistant_message(format!("{prefix}_ok"), "OK".to_string());
|
||||
}
|
||||
|
|
@ -6864,7 +6883,7 @@ fn stream_turn_events(
|
|||
})
|
||||
}
|
||||
|
||||
fn is_turn_terminal(event: &UniversalEvent, agent: AgentId) -> bool {
|
||||
fn is_turn_terminal(event: &UniversalEvent, _agent: AgentId) -> bool {
|
||||
match event.event_type {
|
||||
UniversalEventType::SessionEnded
|
||||
| UniversalEventType::Error
|
||||
|
|
@ -6875,15 +6894,7 @@ fn is_turn_terminal(event: &UniversalEvent, agent: AgentId) -> bool {
|
|||
let UniversalEventData::Item(ItemEventData { item }) = &event.data else {
|
||||
return false;
|
||||
};
|
||||
if let Some(label) = status_label(item) {
|
||||
if label == "turn.completed" || label == "session.idle" {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
if matches!(item.role, Some(ItemRole::Assistant)) && item.kind == ItemKind::Message {
|
||||
return agent != AgentId::Codex;
|
||||
}
|
||||
false
|
||||
matches!(status_label(item), Some("turn.completed" | "session.idle"))
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -145,4 +145,98 @@ describe("OpenCode-compatible Event Streaming", () => {
|
|||
expect(response.data).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("session.idle count", () => {
|
||||
it("should emit exactly one session.idle for echo flow", async () => {
|
||||
const session = await client.session.create();
|
||||
const sessionId = session.data?.id!;
|
||||
|
||||
const eventStream = await client.event.subscribe();
|
||||
const idleEvents: any[] = [];
|
||||
|
||||
// Wait for first idle, then linger 1s for duplicates
|
||||
const collectIdle = new Promise<void>((resolve, reject) => {
|
||||
let lingerTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000);
|
||||
(async () => {
|
||||
try {
|
||||
for await (const event of (eventStream as any).stream) {
|
||||
if (event.type === "session.idle") {
|
||||
idleEvents.push(event);
|
||||
if (!lingerTimer) {
|
||||
lingerTimer = setTimeout(() => {
|
||||
clearTimeout(timeout);
|
||||
resolve();
|
||||
}, 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Stream ended
|
||||
}
|
||||
})();
|
||||
});
|
||||
|
||||
await client.session.prompt({
|
||||
path: { id: sessionId },
|
||||
body: {
|
||||
model: { providerID: "sandbox-agent", modelID: "mock" },
|
||||
parts: [{ type: "text", text: "echo hello" }],
|
||||
},
|
||||
});
|
||||
|
||||
await collectIdle;
|
||||
expect(idleEvents.length).toBe(1);
|
||||
});
|
||||
|
||||
it("should emit exactly one session.idle for tool flow", async () => {
|
||||
const session = await client.session.create();
|
||||
const sessionId = session.data?.id!;
|
||||
|
||||
const eventStream = await client.event.subscribe();
|
||||
const allEvents: any[] = [];
|
||||
const idleEvents: any[] = [];
|
||||
|
||||
const collectIdle = new Promise<void>((resolve, reject) => {
|
||||
let lingerTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000);
|
||||
(async () => {
|
||||
try {
|
||||
for await (const event of (eventStream as any).stream) {
|
||||
allEvents.push(event);
|
||||
if (event.type === "session.idle") {
|
||||
idleEvents.push(event);
|
||||
if (!lingerTimer) {
|
||||
lingerTimer = setTimeout(() => {
|
||||
clearTimeout(timeout);
|
||||
resolve();
|
||||
}, 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Stream ended
|
||||
}
|
||||
})();
|
||||
});
|
||||
|
||||
await client.session.prompt({
|
||||
path: { id: sessionId },
|
||||
body: {
|
||||
model: { providerID: "sandbox-agent", modelID: "mock" },
|
||||
parts: [{ type: "text", text: "tool" }],
|
||||
},
|
||||
});
|
||||
|
||||
await collectIdle;
|
||||
|
||||
expect(idleEvents.length).toBe(1);
|
||||
|
||||
// All tool parts should have been emitted before idle
|
||||
const toolParts = allEvents.filter(
|
||||
(e) => e.type === "message.part.updated" && e.properties?.part?.type === "tool"
|
||||
);
|
||||
expect(toolParts.length).toBeGreaterThan(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -4,9 +4,9 @@ use serde_json::Value;
|
|||
|
||||
use crate::amp as schema;
|
||||
use crate::{
|
||||
ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole,
|
||||
ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy, UniversalEventData,
|
||||
UniversalEventType, UniversalItem,
|
||||
turn_completed_event, ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData,
|
||||
ItemKind, ItemRole, ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy,
|
||||
UniversalEventData, UniversalEventType, UniversalItem,
|
||||
};
|
||||
|
||||
static TEMP_ID: AtomicU64 = AtomicU64::new(1);
|
||||
|
|
@ -99,6 +99,7 @@ pub fn event_to_universal(
|
|||
));
|
||||
}
|
||||
schema::StreamJsonMessageType::Done => {
|
||||
events.push(turn_completed_event());
|
||||
events.push(
|
||||
EventConversion::new(
|
||||
UniversalEventType::SessionEnded,
|
||||
|
|
|
|||
|
|
@ -3,9 +3,9 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
|||
use serde_json::Value;
|
||||
|
||||
use crate::{
|
||||
ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus,
|
||||
PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, SessionStartedData,
|
||||
UniversalEventData, UniversalEventType, UniversalItem,
|
||||
turn_completed_event, ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind,
|
||||
ItemRole, ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus,
|
||||
SessionStartedData, UniversalEventData, UniversalEventType, UniversalItem,
|
||||
};
|
||||
|
||||
static TEMP_ID: AtomicU64 = AtomicU64::new(1);
|
||||
|
|
@ -420,10 +420,13 @@ fn result_event_to_universal(event: &Value, session_id: &str) -> Vec<EventConver
|
|||
status: ItemStatus::Completed,
|
||||
};
|
||||
|
||||
vec![EventConversion::new(
|
||||
UniversalEventType::ItemCompleted,
|
||||
UniversalEventData::Item(ItemEventData { item: message_item }),
|
||||
)]
|
||||
vec![
|
||||
EventConversion::new(
|
||||
UniversalEventType::ItemCompleted,
|
||||
UniversalEventData::Item(ItemEventData { item: message_item }),
|
||||
),
|
||||
turn_completed_event(),
|
||||
]
|
||||
}
|
||||
|
||||
fn claude_message_id(event: &Value, session_id: &str) -> String {
|
||||
|
|
|
|||
|
|
@ -317,6 +317,27 @@ impl EventConversion {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn turn_completed_event() -> EventConversion {
|
||||
EventConversion::new(
|
||||
UniversalEventType::ItemCompleted,
|
||||
UniversalEventData::Item(ItemEventData {
|
||||
item: UniversalItem {
|
||||
item_id: String::new(),
|
||||
native_item_id: None,
|
||||
parent_id: None,
|
||||
kind: ItemKind::Status,
|
||||
role: Some(ItemRole::System),
|
||||
content: vec![ContentPart::Status {
|
||||
label: "turn.completed".to_string(),
|
||||
detail: None,
|
||||
}],
|
||||
status: ItemStatus::Completed,
|
||||
},
|
||||
}),
|
||||
)
|
||||
.synthetic()
|
||||
}
|
||||
|
||||
pub fn item_from_text(role: ItemRole, text: String) -> UniversalItem {
|
||||
UniversalItem {
|
||||
item_id: String::new(),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue