From dfc779faab24478fd4f6c608d78efe760a51160a Mon Sep 17 00:00:00 2001 From: Mario Zechner Date: Mon, 2 Mar 2026 20:48:22 +0100 Subject: [PATCH] fix(coding-agent): serialize session event handling to preserve message order (fixes #1717) --- packages/coding-agent/CHANGELOG.md | 4 + .../coding-agent/src/core/agent-session.ts | 15 +- .../test/agent-session-concurrent.test.ts | 135 ++++++++++++++++++ 3 files changed, 152 insertions(+), 2 deletions(-) diff --git a/packages/coding-agent/CHANGELOG.md b/packages/coding-agent/CHANGELOG.md index 8948106a..d8802825 100644 --- a/packages/coding-agent/CHANGELOG.md +++ b/packages/coding-agent/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Fixed + +- Fixed session message persistence ordering by serializing `AgentSession` event processing, preventing `toolResult` entries from being written before their corresponding assistant tool-call messages when extension handlers are asynchronous ([#1717](https://github.com/badlogic/pi-mono/issues/1717)) + ## [0.55.3] - 2026-02-27 ### Fixed diff --git a/packages/coding-agent/src/core/agent-session.ts b/packages/coding-agent/src/core/agent-session.ts index c9bc97b5..d6f7b955 100644 --- a/packages/coding-agent/src/core/agent-session.ts +++ b/packages/coding-agent/src/core/agent-session.ts @@ -220,6 +220,7 @@ export class AgentSession { // Event subscription state private _unsubscribeAgent?: () => void; private _eventListeners: AgentSessionEventListener[] = []; + private _agentEventQueue: Promise = Promise.resolve(); /** Tracks pending steering messages for UI display. Removed when delivered. */ private _steeringMessages: string[] = []; @@ -314,7 +315,17 @@ export class AgentSession { private _lastAssistantMessage: AssistantMessage | undefined = undefined; /** Internal handler for agent events - shared by subscribe and reconnect */ - private _handleAgentEvent = async (event: AgentEvent): Promise => { + private _handleAgentEvent = (event: AgentEvent): void => { + this._agentEventQueue = this._agentEventQueue.then( + () => this._processAgentEvent(event), + () => this._processAgentEvent(event), + ); + + // Keep queue alive if an event handler fails + this._agentEventQueue.catch(() => {}); + }; + + private async _processAgentEvent(event: AgentEvent): Promise { // When a user message starts, check if it's from either queue and remove it BEFORE emitting // This ensures the UI sees the updated queue state if (event.type === "message_start" && event.message.role === "user") { @@ -393,7 +404,7 @@ export class AgentSession { await this._checkCompaction(msg); } - }; + } /** Resolve the pending retry promise */ private _resolveRetry(): void { diff --git a/packages/coding-agent/test/agent-session-concurrent.test.ts b/packages/coding-agent/test/agent-session-concurrent.test.ts index 7e9fc17d..61eecd08 100644 --- a/packages/coding-agent/test/agent-session-concurrent.test.ts +++ b/packages/coding-agent/test/agent-session-concurrent.test.ts @@ -7,6 +7,7 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import { Agent } from "@mariozechner/pi-agent-core"; import { type AssistantMessage, type AssistantMessageEvent, EventStream, getModel } from "@mariozechner/pi-ai"; +import { Type } from "@sinclair/typebox"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { AgentSession } from "../src/core/agent-session.js"; import { AuthStorage } from "../src/core/auth-storage.js"; @@ -214,4 +215,138 @@ describe("AgentSession concurrent prompt guard", () => { // Second prompt should work await expect(session.prompt("Second message")).resolves.not.toThrow(); }); + + it("should persist message_end events in order with slow extension handlers", async () => { + const model = getModel("anthropic", "claude-sonnet-4-5")!; + const tool = { + name: "dummy", + description: "Dummy tool", + label: "dummy", + parameters: Type.Object({ q: Type.String() }), + execute: async (_toolCallId: string, params: unknown) => { + const q = + typeof params === "object" && params !== null && "q" in params + ? String((params as { q: unknown }).q) + : ""; + return { + content: [{ type: "text" as const, text: `result:${q}` }], + details: {}, + }; + }, + }; + + const agent = new Agent({ + getApiKey: () => "test-key", + initialState: { + model, + systemPrompt: "Test", + tools: [tool], + }, + streamFn: async (_model, context) => { + const stream = new MockAssistantStream(); + queueMicrotask(() => { + const hasToolResult = context.messages.some((message) => message.role === "toolResult"); + + if (hasToolResult) { + const message: AssistantMessage = { + role: "assistant", + content: [{ type: "text", text: "done" }], + api: "anthropic-messages", + provider: "anthropic", + model: "mock", + usage: { + input: 1, + output: 1, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 2, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: Date.now(), + }; + stream.push({ type: "start", partial: { ...message, content: [] } }); + stream.push({ type: "done", reason: "stop", message }); + return; + } + + const message: AssistantMessage = { + role: "assistant", + content: [ + { type: "text", text: "calling tool" }, + { type: "toolCall", id: "toolu_1", name: "dummy", arguments: { q: "x" } }, + ], + api: "anthropic-messages", + provider: "anthropic", + model: "mock", + usage: { + input: 1, + output: 1, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 2, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "toolUse", + timestamp: Date.now(), + }; + + stream.push({ type: "start", partial: { ...message, content: [] } }); + stream.push({ type: "done", reason: "toolUse", message }); + }); + return stream; + }, + }); + + const sessionManager = SessionManager.inMemory(); + const settingsManager = SettingsManager.create(tempDir, tempDir); + const authStorage = AuthStorage.create(join(tempDir, "auth.json")); + const modelRegistry = new ModelRegistry(authStorage, tempDir); + authStorage.setRuntimeApiKey("anthropic", "test-key"); + + session = new AgentSession({ + agent, + sessionManager, + settingsManager, + cwd: tempDir, + modelRegistry, + resourceLoader: createTestResourceLoader(), + baseToolsOverride: { dummy: tool }, + }); + + const sessionWithRunner = session as unknown as { + _extensionRunner?: { + hasHandlers: (eventType: string) => boolean; + emit: (event: { type: string; message?: { role?: string } }) => Promise; + emitInput: ( + text: string, + images: unknown, + source: "interactive" | "rpc" | "extension", + ) => Promise<{ action: "continue" }>; + emitBeforeAgentStart: (prompt: string, images: unknown, systemPrompt: string) => Promise; + }; + }; + sessionWithRunner._extensionRunner = { + hasHandlers: () => false, + emit: async (event) => { + if (event.type === "message_end" && event.message?.role === "assistant") { + await new Promise((resolve) => setTimeout(resolve, 40)); + } + }, + emitInput: async () => ({ action: "continue" }), + emitBeforeAgentStart: async () => undefined, + }; + + await session.prompt("hi"); + await session.agent.waitForIdle(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + const messageEntries = sessionManager.getEntries().filter((entry) => entry.type === "message"); + expect(messageEntries.map((entry) => entry.message.role)).toEqual([ + "user", + "assistant", + "toolResult", + "assistant", + ]); + }); });