diff --git a/server/packages/sandbox-agent/src/opencode_compat.rs b/server/packages/sandbox-agent/src/opencode_compat.rs index e8b6ba5..51d1b53 100644 --- a/server/packages/sandbox-agent/src/opencode_compat.rs +++ b/server/packages/sandbox-agent/src/opencode_compat.rs @@ -225,6 +225,7 @@ impl OpenCodeQuestionRecord { #[derive(Default, Clone)] struct OpenCodeSessionRuntime { last_user_message_id: Option, + active_assistant_message_id: Option, last_agent: Option, last_model_provider: Option, last_model_id: Option, @@ -242,6 +243,8 @@ struct OpenCodeSessionRuntime { tool_args_by_call: HashMap, /// Tool calls that have been requested but not yet resolved. open_tool_calls: HashSet, + /// Assistant messages that have streamed text deltas. + messages_with_text_deltas: HashSet, } #[derive(Clone, Debug)] @@ -1723,11 +1726,8 @@ async fn upsert_message_part( } else { record.parts.push(part); } - record.parts.sort_by(|a, b| { - let a_id = a.get("id").and_then(|v| v.as_str()).unwrap_or(""); - let b_id = b.get("id").and_then(|v| v.as_str()).unwrap_or(""); - a_id.cmp(b_id) - }); + // Preserve insertion order so UI rendering matches stream chronology. + // Sorting by synthetic part IDs can reorder text/tool parts unexpectedly. } async fn session_directory(state: &OpenCodeState, session_id: &str) -> String { @@ -1826,7 +1826,11 @@ async fn apply_universal_event(state: Arc, event: UniversalEve if label == "turn.completed" || label == "session.idle" { let runtime = state .opencode - .update_runtime(&event.session_id, |_| {}) + .update_runtime(&event.session_id, |runtime| { + if runtime.open_tool_calls.is_empty() { + runtime.active_assistant_message_id = None; + } + }) .await; if !runtime.open_tool_calls.is_empty() { return; @@ -2129,56 +2133,54 @@ async fn apply_item_event( if runtime.last_user_message_id.is_none() { runtime.last_user_message_id = parent_id.clone(); } + runtime.active_assistant_message_id = Some(message_id.clone()); }) .await; if let Some(text) = extract_text_from_content(&item.content) { - let part_id = runtime - .part_id_by_message - .entry(message_id.clone()) - .or_insert_with(|| format!("{}_text", message_id)) - .clone(); if event.event_type == UniversalEventType::ItemStarted { - // For ItemStarted, only store the text in runtime as the initial value - // without emitting a part event. Deltas will handle streaming, and - // ItemCompleted will emit the final text part. + // Reset streaming text state for a new assistant item. let _ = state .opencode .update_runtime(&session_id, |runtime| { - runtime - .text_by_message - .insert(message_id.clone(), String::new()); - runtime - .part_id_by_message - .insert(message_id.clone(), part_id.clone()); + runtime.text_by_message.remove(&message_id); + runtime.part_id_by_message.remove(&message_id); + runtime.messages_with_text_deltas.remove(&message_id); }) .await; } else { - // For ItemCompleted, emit the final text part with the complete text. - // Use the accumulated text from deltas if available, otherwise use - // the text from the completed event. - let final_text = runtime - .text_by_message - .get(&message_id) - .filter(|t| !t.is_empty()) - .cloned() - .unwrap_or_else(|| text.clone()); - let part = build_text_part_with_id(&session_id, &message_id, &part_id, &final_text); - upsert_message_part(&state.opencode, &session_id, &message_id, part.clone()).await; - state - .opencode - .emit_event(part_event("message.part.updated", &part)); - let _ = state - .opencode - .update_runtime(&session_id, |runtime| { - runtime - .text_by_message - .insert(message_id.clone(), final_text.clone()); - runtime - .part_id_by_message - .insert(message_id.clone(), part_id.clone()); - }) - .await; + // If text was streamed via deltas, keep segment ordering as emitted and + // avoid replacing the latest segment with full completed text. + let has_streamed_text = runtime.messages_with_text_deltas.contains(&message_id); + if !has_streamed_text { + let part_id = runtime + .part_id_by_message + .get(&message_id) + .cloned() + .unwrap_or_else(|| next_id("part_", &PART_COUNTER)); + let final_text = runtime + .text_by_message + .get(&message_id) + .filter(|t| !t.is_empty()) + .cloned() + .unwrap_or_else(|| text.clone()); + let part = build_text_part_with_id(&session_id, &message_id, &part_id, &final_text); + upsert_message_part(&state.opencode, &session_id, &message_id, part.clone()).await; + state + .opencode + .emit_event(part_event("message.part.updated", &part)); + let _ = state + .opencode + .update_runtime(&session_id, |runtime| { + runtime + .text_by_message + .insert(message_id.clone(), final_text.clone()); + runtime + .part_id_by_message + .insert(message_id.clone(), part_id.clone()); + }) + .await; + } } } @@ -2244,6 +2246,9 @@ async fn apply_item_event( .tool_args_by_call .insert(call_id.clone(), arguments.clone()); runtime.open_tool_calls.insert(call_id.clone()); + // Start a new text segment after tool activity. + runtime.part_id_by_message.remove(&message_id); + runtime.text_by_message.remove(&message_id); }) .await; } @@ -2302,6 +2307,9 @@ async fn apply_item_event( .tool_message_by_call .insert(call_id.clone(), message_id.clone()); runtime.open_tool_calls.remove(call_id); + // Start a new text segment after tool activity. + runtime.part_id_by_message.remove(&message_id); + runtime.text_by_message.remove(&message_id); }) .await; } @@ -2375,6 +2383,7 @@ async fn apply_tool_item_event( .and_then(|key| runtime.message_id_for_item.get(&key).cloned()) }) .or_else(|| runtime.tool_message_by_call.get(&call_id).cloned()) + .or_else(|| runtime.active_assistant_message_id.clone()) { message_id = Some(existing); } else { @@ -2416,7 +2425,7 @@ async fn apply_tool_item_event( let worktree = state.opencode.worktree_for(&directory); let now = state.opencode.now_ms(); - let mut info = build_assistant_message( + let info = build_assistant_message( &session_id, &message_id, parent_id.as_deref().unwrap_or(""), @@ -2427,13 +2436,6 @@ async fn apply_tool_item_event( &provider_id, &model_id, ); - if event.event_type == UniversalEventType::ItemCompleted { - if let Some(obj) = info.as_object_mut() { - if let Some(time) = obj.get_mut("time").and_then(|v| v.as_object_mut()) { - time.insert("completed".to_string(), json!(now)); - } - } - } upsert_message_info(&state.opencode, &session_id, info.clone()).await; state .opencode @@ -2575,6 +2577,9 @@ async fn apply_tool_item_event( { runtime.open_tool_calls.remove(&call_id); } + // Start a new text segment after tool activity. + runtime.part_id_by_message.remove(&message_id); + runtime.text_by_message.remove(&message_id); }) .await; } @@ -2618,6 +2623,7 @@ async fn apply_item_delta( .clone() .and_then(|key| runtime.message_id_for_item.get(&key).cloned()) }) + .or_else(|| runtime.active_assistant_message_id.clone()) { message_id = Some(existing); } else { @@ -2679,7 +2685,7 @@ async fn apply_item_delta( .part_id_by_message .get(&message_id) .cloned() - .unwrap_or_else(|| format!("{}_text", message_id)); + .unwrap_or_else(|| next_id("part_", &PART_COUNTER)); let part = build_text_part_with_id(&session_id, &message_id, &part_id, &text); upsert_message_part(&state.opencode, &session_id, &message_id, part.clone()).await; state.opencode.emit_event(part_event_with_delta( @@ -2694,6 +2700,9 @@ async fn apply_item_delta( runtime .part_id_by_message .insert(message_id.clone(), part_id.clone()); + runtime + .messages_with_text_deltas + .insert(message_id.clone()); }) .await; } @@ -3706,6 +3715,7 @@ async fn oc_session_message_create( .opencode .update_runtime(&session_id, |runtime| { runtime.last_user_message_id = Some(user_message_id.clone()); + runtime.active_assistant_message_id = None; runtime.last_agent = Some(agent_mode.clone()); runtime.last_model_provider = Some(provider_id.clone()); runtime.last_model_id = Some(model_id.clone()); diff --git a/server/packages/sandbox-agent/tests/opencode-compat/events.test.ts b/server/packages/sandbox-agent/tests/opencode-compat/events.test.ts index 2140ef3..61577eb 100644 --- a/server/packages/sandbox-agent/tests/opencode-compat/events.test.ts +++ b/server/packages/sandbox-agent/tests/opencode-compat/events.test.ts @@ -238,5 +238,85 @@ describe("OpenCode-compatible Event Streaming", () => { ); expect(toolParts.length).toBeGreaterThan(0); }); + + it("should preserve part order based on first stream appearance", async () => { + const session = await client.session.create(); + const sessionId = session.data?.id!; + + const eventStream = await client.event.subscribe(); + const seenPartIds: string[] = []; + let targetMessageId: string | null = null; + + const collectIdle = new Promise((resolve, reject) => { + let lingerTimer: ReturnType | null = null; + const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000); + (async () => { + try { + for await (const event of (eventStream as any).stream) { + if (event?.properties?.sessionID !== sessionId) { + continue; + } + + if (event.type === "message.part.updated") { + const messageId = event.properties?.messageID; + const partId = event.properties?.part?.id; + const partType = event.properties?.part?.type; + if (!targetMessageId && partType === "tool" && typeof messageId === "string") { + targetMessageId = messageId; + } + if ( + targetMessageId && + messageId === targetMessageId && + typeof partId === "string" && + !seenPartIds.includes(partId) + ) { + seenPartIds.push(partId); + } + } + + if (event.type === "session.idle") { + if (!lingerTimer) { + lingerTimer = setTimeout(() => { + clearTimeout(timeout); + resolve(); + }, 500); + } + } + } + } catch { + // Stream ended + } + })(); + }); + + await client.session.prompt({ + path: { id: sessionId }, + body: { + model: { providerID: "mock", modelID: "mock" }, + parts: [{ type: "text", text: "tool" }], + }, + }); + + await collectIdle; + + expect(targetMessageId).toBeTruthy(); + expect(seenPartIds.length).toBeGreaterThan(0); + + const response = await fetch( + `${handle.baseUrl}/opencode/session/${sessionId}/message/${targetMessageId}`, + { + headers: { Authorization: `Bearer ${handle.token}` }, + } + ); + expect(response.ok).toBe(true); + const message = (await response.json()) as any; + const returnedPartIds = (message?.parts ?? []) + .map((part: any) => part?.id) + .filter((id: any) => typeof id === "string"); + + const expectedSet = new Set(seenPartIds); + const returnedFiltered = returnedPartIds.filter((id: string) => expectedSet.has(id)); + expect(returnedFiltered).toEqual(seenPartIds); + }); }); });