mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-17 08:01:03 +00:00
fix: consistent turn.completed signal and OpenCode session.idle timing (#97)
This commit is contained in:
parent
b74539172b
commit
c0800e1a43
6 changed files with 171 additions and 35 deletions
|
|
@ -1410,6 +1410,25 @@ async fn apply_universal_event(state: Arc<OpenCodeAppState>, event: UniversalEve
|
||||||
match event.event_type {
|
match event.event_type {
|
||||||
UniversalEventType::ItemStarted | UniversalEventType::ItemCompleted => {
|
UniversalEventType::ItemStarted | UniversalEventType::ItemCompleted => {
|
||||||
if let UniversalEventData::Item(ItemEventData { item }) = &event.data {
|
if let UniversalEventData::Item(ItemEventData { item }) = &event.data {
|
||||||
|
// turn.completed or session.idle status → emit session.idle
|
||||||
|
if event.event_type == UniversalEventType::ItemCompleted
|
||||||
|
&& item.kind == ItemKind::Status
|
||||||
|
{
|
||||||
|
if let Some(ContentPart::Status { label, .. }) = item.content.first() {
|
||||||
|
if label == "turn.completed" || label == "session.idle" {
|
||||||
|
let session_id = event.session_id.clone();
|
||||||
|
state.opencode.emit_event(json!({
|
||||||
|
"type": "session.status",
|
||||||
|
"properties": {"sessionID": session_id, "status": {"type": "idle"}}
|
||||||
|
}));
|
||||||
|
state.opencode.emit_event(json!({
|
||||||
|
"type": "session.idle",
|
||||||
|
"properties": {"sessionID": session_id}
|
||||||
|
}));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
apply_item_event(state, event.clone(), item.clone()).await;
|
apply_item_event(state, event.clone(), item.clone()).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1894,19 +1913,6 @@ async fn apply_item_event(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if event.event_type == UniversalEventType::ItemCompleted {
|
|
||||||
state.opencode.emit_event(json!({
|
|
||||||
"type": "session.status",
|
|
||||||
"properties": {
|
|
||||||
"sessionID": session_id,
|
|
||||||
"status": {"type": "idle"}
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
state.opencode.emit_event(json!({
|
|
||||||
"type": "session.idle",
|
|
||||||
"properties": { "sessionID": session_id }
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn apply_tool_item_event(
|
async fn apply_tool_item_event(
|
||||||
|
|
|
||||||
|
|
@ -22,8 +22,8 @@ use reqwest::Client;
|
||||||
use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError};
|
use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError};
|
||||||
use sandbox_agent_universal_agent_schema::{
|
use sandbox_agent_universal_agent_schema::{
|
||||||
codex as codex_schema, convert_amp, convert_claude, convert_codex, convert_opencode,
|
codex as codex_schema, convert_amp, convert_claude, convert_codex, convert_opencode,
|
||||||
AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource, FileAction,
|
turn_completed_event, AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource,
|
||||||
ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData,
|
FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData,
|
||||||
PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason,
|
PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason,
|
||||||
SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent,
|
SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent,
|
||||||
UniversalEventData, UniversalEventType, UniversalItem,
|
UniversalEventData, UniversalEventType, UniversalItem,
|
||||||
|
|
@ -6029,7 +6029,26 @@ fn mock_command_conversions(prefix: &str, input: &str) -> Vec<EventConversion> {
|
||||||
if trimmed.is_empty() {
|
if trimmed.is_empty() {
|
||||||
return vec![];
|
return vec![];
|
||||||
}
|
}
|
||||||
|
let mut events = mock_command_events(prefix, trimmed);
|
||||||
|
if should_append_turn_completed(&events) {
|
||||||
|
events.push(turn_completed_event());
|
||||||
|
}
|
||||||
|
events
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_append_turn_completed(events: &[EventConversion]) -> bool {
|
||||||
|
let Some(last) = events.last() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
!matches!(
|
||||||
|
last.event_type,
|
||||||
|
UniversalEventType::SessionEnded
|
||||||
|
| UniversalEventType::PermissionRequested
|
||||||
|
| UniversalEventType::QuestionRequested
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mock_command_events(prefix: &str, trimmed: &str) -> Vec<EventConversion> {
|
||||||
if trimmed.eq_ignore_ascii_case(MOCK_OK_PROMPT) {
|
if trimmed.eq_ignore_ascii_case(MOCK_OK_PROMPT) {
|
||||||
return mock_assistant_message(format!("{prefix}_ok"), "OK".to_string());
|
return mock_assistant_message(format!("{prefix}_ok"), "OK".to_string());
|
||||||
}
|
}
|
||||||
|
|
@ -6864,7 +6883,7 @@ fn stream_turn_events(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_turn_terminal(event: &UniversalEvent, agent: AgentId) -> bool {
|
fn is_turn_terminal(event: &UniversalEvent, _agent: AgentId) -> bool {
|
||||||
match event.event_type {
|
match event.event_type {
|
||||||
UniversalEventType::SessionEnded
|
UniversalEventType::SessionEnded
|
||||||
| UniversalEventType::Error
|
| UniversalEventType::Error
|
||||||
|
|
@ -6875,15 +6894,7 @@ fn is_turn_terminal(event: &UniversalEvent, agent: AgentId) -> bool {
|
||||||
let UniversalEventData::Item(ItemEventData { item }) = &event.data else {
|
let UniversalEventData::Item(ItemEventData { item }) = &event.data else {
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
if let Some(label) = status_label(item) {
|
matches!(status_label(item), Some("turn.completed" | "session.idle"))
|
||||||
if label == "turn.completed" || label == "session.idle" {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if matches!(item.role, Some(ItemRole::Assistant)) && item.kind == ItemKind::Message {
|
|
||||||
return agent != AgentId::Codex;
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
}
|
||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -145,4 +145,98 @@ describe("OpenCode-compatible Event Streaming", () => {
|
||||||
expect(response.data).toBeDefined();
|
expect(response.data).toBeDefined();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("session.idle count", () => {
|
||||||
|
it("should emit exactly one session.idle for echo flow", async () => {
|
||||||
|
const session = await client.session.create();
|
||||||
|
const sessionId = session.data?.id!;
|
||||||
|
|
||||||
|
const eventStream = await client.event.subscribe();
|
||||||
|
const idleEvents: any[] = [];
|
||||||
|
|
||||||
|
// Wait for first idle, then linger 1s for duplicates
|
||||||
|
const collectIdle = new Promise<void>((resolve, reject) => {
|
||||||
|
let lingerTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000);
|
||||||
|
(async () => {
|
||||||
|
try {
|
||||||
|
for await (const event of (eventStream as any).stream) {
|
||||||
|
if (event.type === "session.idle") {
|
||||||
|
idleEvents.push(event);
|
||||||
|
if (!lingerTimer) {
|
||||||
|
lingerTimer = setTimeout(() => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
}, 1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Stream ended
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
});
|
||||||
|
|
||||||
|
await client.session.prompt({
|
||||||
|
path: { id: sessionId },
|
||||||
|
body: {
|
||||||
|
model: { providerID: "sandbox-agent", modelID: "mock" },
|
||||||
|
parts: [{ type: "text", text: "echo hello" }],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await collectIdle;
|
||||||
|
expect(idleEvents.length).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should emit exactly one session.idle for tool flow", async () => {
|
||||||
|
const session = await client.session.create();
|
||||||
|
const sessionId = session.data?.id!;
|
||||||
|
|
||||||
|
const eventStream = await client.event.subscribe();
|
||||||
|
const allEvents: any[] = [];
|
||||||
|
const idleEvents: any[] = [];
|
||||||
|
|
||||||
|
const collectIdle = new Promise<void>((resolve, reject) => {
|
||||||
|
let lingerTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000);
|
||||||
|
(async () => {
|
||||||
|
try {
|
||||||
|
for await (const event of (eventStream as any).stream) {
|
||||||
|
allEvents.push(event);
|
||||||
|
if (event.type === "session.idle") {
|
||||||
|
idleEvents.push(event);
|
||||||
|
if (!lingerTimer) {
|
||||||
|
lingerTimer = setTimeout(() => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
resolve();
|
||||||
|
}, 1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Stream ended
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
});
|
||||||
|
|
||||||
|
await client.session.prompt({
|
||||||
|
path: { id: sessionId },
|
||||||
|
body: {
|
||||||
|
model: { providerID: "sandbox-agent", modelID: "mock" },
|
||||||
|
parts: [{ type: "text", text: "tool" }],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await collectIdle;
|
||||||
|
|
||||||
|
expect(idleEvents.length).toBe(1);
|
||||||
|
|
||||||
|
// All tool parts should have been emitted before idle
|
||||||
|
const toolParts = allEvents.filter(
|
||||||
|
(e) => e.type === "message.part.updated" && e.properties?.part?.type === "tool"
|
||||||
|
);
|
||||||
|
expect(toolParts.length).toBeGreaterThan(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,9 @@ use serde_json::Value;
|
||||||
|
|
||||||
use crate::amp as schema;
|
use crate::amp as schema;
|
||||||
use crate::{
|
use crate::{
|
||||||
ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole,
|
turn_completed_event, ContentPart, ErrorData, EventConversion, ItemDeltaData, ItemEventData,
|
||||||
ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy, UniversalEventData,
|
ItemKind, ItemRole, ItemStatus, SessionEndReason, SessionEndedData, TerminatedBy,
|
||||||
UniversalEventType, UniversalItem,
|
UniversalEventData, UniversalEventType, UniversalItem,
|
||||||
};
|
};
|
||||||
|
|
||||||
static TEMP_ID: AtomicU64 = AtomicU64::new(1);
|
static TEMP_ID: AtomicU64 = AtomicU64::new(1);
|
||||||
|
|
@ -99,6 +99,7 @@ pub fn event_to_universal(
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
schema::StreamJsonMessageType::Done => {
|
schema::StreamJsonMessageType::Done => {
|
||||||
|
events.push(turn_completed_event());
|
||||||
events.push(
|
events.push(
|
||||||
EventConversion::new(
|
EventConversion::new(
|
||||||
UniversalEventType::SessionEnded,
|
UniversalEventType::SessionEnded,
|
||||||
|
|
|
||||||
|
|
@ -3,9 +3,9 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus,
|
turn_completed_event, ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind,
|
||||||
PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, SessionStartedData,
|
ItemRole, ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus,
|
||||||
UniversalEventData, UniversalEventType, UniversalItem,
|
SessionStartedData, UniversalEventData, UniversalEventType, UniversalItem,
|
||||||
};
|
};
|
||||||
|
|
||||||
static TEMP_ID: AtomicU64 = AtomicU64::new(1);
|
static TEMP_ID: AtomicU64 = AtomicU64::new(1);
|
||||||
|
|
@ -420,10 +420,13 @@ fn result_event_to_universal(event: &Value, session_id: &str) -> Vec<EventConver
|
||||||
status: ItemStatus::Completed,
|
status: ItemStatus::Completed,
|
||||||
};
|
};
|
||||||
|
|
||||||
vec![EventConversion::new(
|
vec![
|
||||||
UniversalEventType::ItemCompleted,
|
EventConversion::new(
|
||||||
UniversalEventData::Item(ItemEventData { item: message_item }),
|
UniversalEventType::ItemCompleted,
|
||||||
)]
|
UniversalEventData::Item(ItemEventData { item: message_item }),
|
||||||
|
),
|
||||||
|
turn_completed_event(),
|
||||||
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
fn claude_message_id(event: &Value, session_id: &str) -> String {
|
fn claude_message_id(event: &Value, session_id: &str) -> String {
|
||||||
|
|
|
||||||
|
|
@ -317,6 +317,27 @@ impl EventConversion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn turn_completed_event() -> EventConversion {
|
||||||
|
EventConversion::new(
|
||||||
|
UniversalEventType::ItemCompleted,
|
||||||
|
UniversalEventData::Item(ItemEventData {
|
||||||
|
item: UniversalItem {
|
||||||
|
item_id: String::new(),
|
||||||
|
native_item_id: None,
|
||||||
|
parent_id: None,
|
||||||
|
kind: ItemKind::Status,
|
||||||
|
role: Some(ItemRole::System),
|
||||||
|
content: vec![ContentPart::Status {
|
||||||
|
label: "turn.completed".to_string(),
|
||||||
|
detail: None,
|
||||||
|
}],
|
||||||
|
status: ItemStatus::Completed,
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.synthetic()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn item_from_text(role: ItemRole, text: String) -> UniversalItem {
|
pub fn item_from_text(role: ItemRole, text: String) -> UniversalItem {
|
||||||
UniversalItem {
|
UniversalItem {
|
||||||
item_id: String::new(),
|
item_id: String::new(),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue