fix(opencode-compat): preserve chronological part ordering across interleaved tool and text streams (#133)

This commit is contained in:
NathanFlurry 2026-02-07 09:09:49 +00:00
parent 479c8468e8
commit 4bdd2416d1
No known key found for this signature in database
GPG key ID: 6A5F43A4F3241BCA
2 changed files with 144 additions and 54 deletions

View file

@ -225,6 +225,7 @@ impl OpenCodeQuestionRecord {
#[derive(Default, Clone)]
struct OpenCodeSessionRuntime {
last_user_message_id: Option<String>,
active_assistant_message_id: Option<String>,
last_agent: Option<String>,
last_model_provider: Option<String>,
last_model_id: Option<String>,
@ -242,6 +243,8 @@ struct OpenCodeSessionRuntime {
tool_args_by_call: HashMap<String, String>,
/// Tool calls that have been requested but not yet resolved.
open_tool_calls: HashSet<String>,
/// Assistant messages that have streamed text deltas.
messages_with_text_deltas: HashSet<String>,
}
#[derive(Clone, Debug)]
@ -1723,11 +1726,8 @@ async fn upsert_message_part(
} else {
record.parts.push(part);
}
record.parts.sort_by(|a, b| {
let a_id = a.get("id").and_then(|v| v.as_str()).unwrap_or("");
let b_id = b.get("id").and_then(|v| v.as_str()).unwrap_or("");
a_id.cmp(b_id)
});
// Preserve insertion order so UI rendering matches stream chronology.
// Sorting by synthetic part IDs can reorder text/tool parts unexpectedly.
}
async fn session_directory(state: &OpenCodeState, session_id: &str) -> String {
@ -1826,7 +1826,11 @@ async fn apply_universal_event(state: Arc<OpenCodeAppState>, event: UniversalEve
if label == "turn.completed" || label == "session.idle" {
let runtime = state
.opencode
.update_runtime(&event.session_id, |_| {})
.update_runtime(&event.session_id, |runtime| {
if runtime.open_tool_calls.is_empty() {
runtime.active_assistant_message_id = None;
}
})
.await;
if !runtime.open_tool_calls.is_empty() {
return;
@ -2129,34 +2133,31 @@ async fn apply_item_event(
if runtime.last_user_message_id.is_none() {
runtime.last_user_message_id = parent_id.clone();
}
runtime.active_assistant_message_id = Some(message_id.clone());
})
.await;
if let Some(text) = extract_text_from_content(&item.content) {
let part_id = runtime
.part_id_by_message
.entry(message_id.clone())
.or_insert_with(|| format!("{}_text", message_id))
.clone();
if event.event_type == UniversalEventType::ItemStarted {
// For ItemStarted, only store the text in runtime as the initial value
// without emitting a part event. Deltas will handle streaming, and
// ItemCompleted will emit the final text part.
// Reset streaming text state for a new assistant item.
let _ = state
.opencode
.update_runtime(&session_id, |runtime| {
runtime
.text_by_message
.insert(message_id.clone(), String::new());
runtime
.part_id_by_message
.insert(message_id.clone(), part_id.clone());
runtime.text_by_message.remove(&message_id);
runtime.part_id_by_message.remove(&message_id);
runtime.messages_with_text_deltas.remove(&message_id);
})
.await;
} else {
// For ItemCompleted, emit the final text part with the complete text.
// Use the accumulated text from deltas if available, otherwise use
// the text from the completed event.
// If text was streamed via deltas, keep segment ordering as emitted and
// avoid replacing the latest segment with full completed text.
let has_streamed_text = runtime.messages_with_text_deltas.contains(&message_id);
if !has_streamed_text {
let part_id = runtime
.part_id_by_message
.get(&message_id)
.cloned()
.unwrap_or_else(|| next_id("part_", &PART_COUNTER));
let final_text = runtime
.text_by_message
.get(&message_id)
@ -2181,6 +2182,7 @@ async fn apply_item_event(
.await;
}
}
}
for part in item.content.iter() {
match part {
@ -2244,6 +2246,9 @@ async fn apply_item_event(
.tool_args_by_call
.insert(call_id.clone(), arguments.clone());
runtime.open_tool_calls.insert(call_id.clone());
// Start a new text segment after tool activity.
runtime.part_id_by_message.remove(&message_id);
runtime.text_by_message.remove(&message_id);
})
.await;
}
@ -2302,6 +2307,9 @@ async fn apply_item_event(
.tool_message_by_call
.insert(call_id.clone(), message_id.clone());
runtime.open_tool_calls.remove(call_id);
// Start a new text segment after tool activity.
runtime.part_id_by_message.remove(&message_id);
runtime.text_by_message.remove(&message_id);
})
.await;
}
@ -2375,6 +2383,7 @@ async fn apply_tool_item_event(
.and_then(|key| runtime.message_id_for_item.get(&key).cloned())
})
.or_else(|| runtime.tool_message_by_call.get(&call_id).cloned())
.or_else(|| runtime.active_assistant_message_id.clone())
{
message_id = Some(existing);
} else {
@ -2416,7 +2425,7 @@ async fn apply_tool_item_event(
let worktree = state.opencode.worktree_for(&directory);
let now = state.opencode.now_ms();
let mut info = build_assistant_message(
let info = build_assistant_message(
&session_id,
&message_id,
parent_id.as_deref().unwrap_or(""),
@ -2427,13 +2436,6 @@ async fn apply_tool_item_event(
&provider_id,
&model_id,
);
if event.event_type == UniversalEventType::ItemCompleted {
if let Some(obj) = info.as_object_mut() {
if let Some(time) = obj.get_mut("time").and_then(|v| v.as_object_mut()) {
time.insert("completed".to_string(), json!(now));
}
}
}
upsert_message_info(&state.opencode, &session_id, info.clone()).await;
state
.opencode
@ -2575,6 +2577,9 @@ async fn apply_tool_item_event(
{
runtime.open_tool_calls.remove(&call_id);
}
// Start a new text segment after tool activity.
runtime.part_id_by_message.remove(&message_id);
runtime.text_by_message.remove(&message_id);
})
.await;
}
@ -2618,6 +2623,7 @@ async fn apply_item_delta(
.clone()
.and_then(|key| runtime.message_id_for_item.get(&key).cloned())
})
.or_else(|| runtime.active_assistant_message_id.clone())
{
message_id = Some(existing);
} else {
@ -2679,7 +2685,7 @@ async fn apply_item_delta(
.part_id_by_message
.get(&message_id)
.cloned()
.unwrap_or_else(|| format!("{}_text", message_id));
.unwrap_or_else(|| next_id("part_", &PART_COUNTER));
let part = build_text_part_with_id(&session_id, &message_id, &part_id, &text);
upsert_message_part(&state.opencode, &session_id, &message_id, part.clone()).await;
state.opencode.emit_event(part_event_with_delta(
@ -2694,6 +2700,9 @@ async fn apply_item_delta(
runtime
.part_id_by_message
.insert(message_id.clone(), part_id.clone());
runtime
.messages_with_text_deltas
.insert(message_id.clone());
})
.await;
}
@ -3706,6 +3715,7 @@ async fn oc_session_message_create(
.opencode
.update_runtime(&session_id, |runtime| {
runtime.last_user_message_id = Some(user_message_id.clone());
runtime.active_assistant_message_id = None;
runtime.last_agent = Some(agent_mode.clone());
runtime.last_model_provider = Some(provider_id.clone());
runtime.last_model_id = Some(model_id.clone());

View file

@ -238,5 +238,85 @@ describe("OpenCode-compatible Event Streaming", () => {
);
expect(toolParts.length).toBeGreaterThan(0);
});
it("should preserve part order based on first stream appearance", async () => {
const session = await client.session.create();
const sessionId = session.data?.id!;
const eventStream = await client.event.subscribe();
const seenPartIds: string[] = [];
let targetMessageId: string | null = null;
const collectIdle = new Promise<void>((resolve, reject) => {
let lingerTimer: ReturnType<typeof setTimeout> | null = null;
const timeout = setTimeout(() => reject(new Error("Timed out waiting for session.idle")), 15_000);
(async () => {
try {
for await (const event of (eventStream as any).stream) {
if (event?.properties?.sessionID !== sessionId) {
continue;
}
if (event.type === "message.part.updated") {
const messageId = event.properties?.messageID;
const partId = event.properties?.part?.id;
const partType = event.properties?.part?.type;
if (!targetMessageId && partType === "tool" && typeof messageId === "string") {
targetMessageId = messageId;
}
if (
targetMessageId &&
messageId === targetMessageId &&
typeof partId === "string" &&
!seenPartIds.includes(partId)
) {
seenPartIds.push(partId);
}
}
if (event.type === "session.idle") {
if (!lingerTimer) {
lingerTimer = setTimeout(() => {
clearTimeout(timeout);
resolve();
}, 500);
}
}
}
} catch {
// Stream ended
}
})();
});
await client.session.prompt({
path: { id: sessionId },
body: {
model: { providerID: "mock", modelID: "mock" },
parts: [{ type: "text", text: "tool" }],
},
});
await collectIdle;
expect(targetMessageId).toBeTruthy();
expect(seenPartIds.length).toBeGreaterThan(0);
const response = await fetch(
`${handle.baseUrl}/opencode/session/${sessionId}/message/${targetMessageId}`,
{
headers: { Authorization: `Bearer ${handle.token}` },
}
);
expect(response.ok).toBe(true);
const message = (await response.json()) as any;
const returnedPartIds = (message?.parts ?? [])
.map((part: any) => part?.id)
.filter((id: any) => typeof id === "string");
const expectedSet = new Set(seenPartIds);
const returnedFiltered = returnedPartIds.filter((id: string) => expectedSet.has(id));
expect(returnedFiltered).toEqual(seenPartIds);
});
});
});