mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-20 06:04:15 +00:00
fix(coding-agent): serialize session event handling to preserve message order (fixes #1717)
This commit is contained in:
parent
062f7ff52d
commit
dfc779faab
3 changed files with 152 additions and 2 deletions
|
|
@ -2,6 +2,10 @@
|
||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
- Fixed session message persistence ordering by serializing `AgentSession` event processing, preventing `toolResult` entries from being written before their corresponding assistant tool-call messages when extension handlers are asynchronous ([#1717](https://github.com/badlogic/pi-mono/issues/1717))
|
||||||
|
|
||||||
## [0.55.3] - 2026-02-27
|
## [0.55.3] - 2026-02-27
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
|
||||||
|
|
@ -220,6 +220,7 @@ export class AgentSession {
|
||||||
// Event subscription state
|
// Event subscription state
|
||||||
private _unsubscribeAgent?: () => void;
|
private _unsubscribeAgent?: () => void;
|
||||||
private _eventListeners: AgentSessionEventListener[] = [];
|
private _eventListeners: AgentSessionEventListener[] = [];
|
||||||
|
private _agentEventQueue: Promise<void> = Promise.resolve();
|
||||||
|
|
||||||
/** Tracks pending steering messages for UI display. Removed when delivered. */
|
/** Tracks pending steering messages for UI display. Removed when delivered. */
|
||||||
private _steeringMessages: string[] = [];
|
private _steeringMessages: string[] = [];
|
||||||
|
|
@ -314,7 +315,17 @@ export class AgentSession {
|
||||||
private _lastAssistantMessage: AssistantMessage | undefined = undefined;
|
private _lastAssistantMessage: AssistantMessage | undefined = undefined;
|
||||||
|
|
||||||
/** Internal handler for agent events - shared by subscribe and reconnect */
|
/** Internal handler for agent events - shared by subscribe and reconnect */
|
||||||
private _handleAgentEvent = async (event: AgentEvent): Promise<void> => {
|
private _handleAgentEvent = (event: AgentEvent): void => {
|
||||||
|
this._agentEventQueue = this._agentEventQueue.then(
|
||||||
|
() => this._processAgentEvent(event),
|
||||||
|
() => this._processAgentEvent(event),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Keep queue alive if an event handler fails
|
||||||
|
this._agentEventQueue.catch(() => {});
|
||||||
|
};
|
||||||
|
|
||||||
|
private async _processAgentEvent(event: AgentEvent): Promise<void> {
|
||||||
// When a user message starts, check if it's from either queue and remove it BEFORE emitting
|
// When a user message starts, check if it's from either queue and remove it BEFORE emitting
|
||||||
// This ensures the UI sees the updated queue state
|
// This ensures the UI sees the updated queue state
|
||||||
if (event.type === "message_start" && event.message.role === "user") {
|
if (event.type === "message_start" && event.message.role === "user") {
|
||||||
|
|
@ -393,7 +404,7 @@ export class AgentSession {
|
||||||
|
|
||||||
await this._checkCompaction(msg);
|
await this._checkCompaction(msg);
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
/** Resolve the pending retry promise */
|
/** Resolve the pending retry promise */
|
||||||
private _resolveRetry(): void {
|
private _resolveRetry(): void {
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import { tmpdir } from "node:os";
|
||||||
import { join } from "node:path";
|
import { join } from "node:path";
|
||||||
import { Agent } from "@mariozechner/pi-agent-core";
|
import { Agent } from "@mariozechner/pi-agent-core";
|
||||||
import { type AssistantMessage, type AssistantMessageEvent, EventStream, getModel } from "@mariozechner/pi-ai";
|
import { type AssistantMessage, type AssistantMessageEvent, EventStream, getModel } from "@mariozechner/pi-ai";
|
||||||
|
import { Type } from "@sinclair/typebox";
|
||||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||||
import { AgentSession } from "../src/core/agent-session.js";
|
import { AgentSession } from "../src/core/agent-session.js";
|
||||||
import { AuthStorage } from "../src/core/auth-storage.js";
|
import { AuthStorage } from "../src/core/auth-storage.js";
|
||||||
|
|
@ -214,4 +215,138 @@ describe("AgentSession concurrent prompt guard", () => {
|
||||||
// Second prompt should work
|
// Second prompt should work
|
||||||
await expect(session.prompt("Second message")).resolves.not.toThrow();
|
await expect(session.prompt("Second message")).resolves.not.toThrow();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("should persist message_end events in order with slow extension handlers", async () => {
|
||||||
|
const model = getModel("anthropic", "claude-sonnet-4-5")!;
|
||||||
|
const tool = {
|
||||||
|
name: "dummy",
|
||||||
|
description: "Dummy tool",
|
||||||
|
label: "dummy",
|
||||||
|
parameters: Type.Object({ q: Type.String() }),
|
||||||
|
execute: async (_toolCallId: string, params: unknown) => {
|
||||||
|
const q =
|
||||||
|
typeof params === "object" && params !== null && "q" in params
|
||||||
|
? String((params as { q: unknown }).q)
|
||||||
|
: "";
|
||||||
|
return {
|
||||||
|
content: [{ type: "text" as const, text: `result:${q}` }],
|
||||||
|
details: {},
|
||||||
|
};
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const agent = new Agent({
|
||||||
|
getApiKey: () => "test-key",
|
||||||
|
initialState: {
|
||||||
|
model,
|
||||||
|
systemPrompt: "Test",
|
||||||
|
tools: [tool],
|
||||||
|
},
|
||||||
|
streamFn: async (_model, context) => {
|
||||||
|
const stream = new MockAssistantStream();
|
||||||
|
queueMicrotask(() => {
|
||||||
|
const hasToolResult = context.messages.some((message) => message.role === "toolResult");
|
||||||
|
|
||||||
|
if (hasToolResult) {
|
||||||
|
const message: AssistantMessage = {
|
||||||
|
role: "assistant",
|
||||||
|
content: [{ type: "text", text: "done" }],
|
||||||
|
api: "anthropic-messages",
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "mock",
|
||||||
|
usage: {
|
||||||
|
input: 1,
|
||||||
|
output: 1,
|
||||||
|
cacheRead: 0,
|
||||||
|
cacheWrite: 0,
|
||||||
|
totalTokens: 2,
|
||||||
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||||
|
},
|
||||||
|
stopReason: "stop",
|
||||||
|
timestamp: Date.now(),
|
||||||
|
};
|
||||||
|
stream.push({ type: "start", partial: { ...message, content: [] } });
|
||||||
|
stream.push({ type: "done", reason: "stop", message });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const message: AssistantMessage = {
|
||||||
|
role: "assistant",
|
||||||
|
content: [
|
||||||
|
{ type: "text", text: "calling tool" },
|
||||||
|
{ type: "toolCall", id: "toolu_1", name: "dummy", arguments: { q: "x" } },
|
||||||
|
],
|
||||||
|
api: "anthropic-messages",
|
||||||
|
provider: "anthropic",
|
||||||
|
model: "mock",
|
||||||
|
usage: {
|
||||||
|
input: 1,
|
||||||
|
output: 1,
|
||||||
|
cacheRead: 0,
|
||||||
|
cacheWrite: 0,
|
||||||
|
totalTokens: 2,
|
||||||
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||||
|
},
|
||||||
|
stopReason: "toolUse",
|
||||||
|
timestamp: Date.now(),
|
||||||
|
};
|
||||||
|
|
||||||
|
stream.push({ type: "start", partial: { ...message, content: [] } });
|
||||||
|
stream.push({ type: "done", reason: "toolUse", message });
|
||||||
|
});
|
||||||
|
return stream;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const sessionManager = SessionManager.inMemory();
|
||||||
|
const settingsManager = SettingsManager.create(tempDir, tempDir);
|
||||||
|
const authStorage = AuthStorage.create(join(tempDir, "auth.json"));
|
||||||
|
const modelRegistry = new ModelRegistry(authStorage, tempDir);
|
||||||
|
authStorage.setRuntimeApiKey("anthropic", "test-key");
|
||||||
|
|
||||||
|
session = new AgentSession({
|
||||||
|
agent,
|
||||||
|
sessionManager,
|
||||||
|
settingsManager,
|
||||||
|
cwd: tempDir,
|
||||||
|
modelRegistry,
|
||||||
|
resourceLoader: createTestResourceLoader(),
|
||||||
|
baseToolsOverride: { dummy: tool },
|
||||||
|
});
|
||||||
|
|
||||||
|
const sessionWithRunner = session as unknown as {
|
||||||
|
_extensionRunner?: {
|
||||||
|
hasHandlers: (eventType: string) => boolean;
|
||||||
|
emit: (event: { type: string; message?: { role?: string } }) => Promise<void>;
|
||||||
|
emitInput: (
|
||||||
|
text: string,
|
||||||
|
images: unknown,
|
||||||
|
source: "interactive" | "rpc" | "extension",
|
||||||
|
) => Promise<{ action: "continue" }>;
|
||||||
|
emitBeforeAgentStart: (prompt: string, images: unknown, systemPrompt: string) => Promise<undefined>;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
sessionWithRunner._extensionRunner = {
|
||||||
|
hasHandlers: () => false,
|
||||||
|
emit: async (event) => {
|
||||||
|
if (event.type === "message_end" && event.message?.role === "assistant") {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 40));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emitInput: async () => ({ action: "continue" }),
|
||||||
|
emitBeforeAgentStart: async () => undefined,
|
||||||
|
};
|
||||||
|
|
||||||
|
await session.prompt("hi");
|
||||||
|
await session.agent.waitForIdle();
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const messageEntries = sessionManager.getEntries().filter((entry) => entry.type === "message");
|
||||||
|
expect(messageEntries.map((entry) => entry.message.role)).toEqual([
|
||||||
|
"user",
|
||||||
|
"assistant",
|
||||||
|
"toolResult",
|
||||||
|
"assistant",
|
||||||
|
]);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue