fix(agent,coding-agent): resume queued messages after auto-compaction

This commit is contained in:
Mario Zechner 2026-02-06 11:30:43 +01:00
parent 703ee26625
commit b050c582a1
7 changed files with 247 additions and 27 deletions

View file

@ -23,7 +23,7 @@ read README.md, then ask which module(s) to work on. Based on the answer, read t
- After code changes (not documentation changes): `npm run check` (get full output, no tail). Fix all errors, warnings, and infos before committing.
- Note: `npm run check` does not run tests.
- NEVER run: `npm run dev`, `npm run build`, `npm test`
- Only run specific tests if user instructs: `npm test -- test/specific.test.ts`
- Only run specific tests if user instructs: `npx tsx ./node_modules/vitest/vitest.mjs --run test/specific.test.ts`
- Run tests from the package root, not the repo root.
- When writing tests, run them, identify issues in either the test or implementation, and iterate until fixed.
- NEVER commit unless user asks

View file

@ -2,6 +2,10 @@
## [Unreleased]
### Fixed
- Fixed `continue()` to resume queued steering/follow-up messages when context currently ends in an assistant message, and preserved one-at-a-time steering ordering during assistant-tail resumes ([#1312](https://github.com/badlogic/pi-mono/pull/1312) by [@ferologics](https://github.com/ferologics))
## [0.52.6] - 2026-02-05
## [0.52.5] - 2026-02-05

View file

@ -252,6 +252,40 @@ export class Agent {
this.followUpQueue = [];
}
hasQueuedMessages(): boolean {
return this.steeringQueue.length > 0 || this.followUpQueue.length > 0;
}
private dequeueSteeringMessages(): AgentMessage[] {
if (this.steeringMode === "one-at-a-time") {
if (this.steeringQueue.length > 0) {
const first = this.steeringQueue[0];
this.steeringQueue = this.steeringQueue.slice(1);
return [first];
}
return [];
}
const steering = this.steeringQueue.slice();
this.steeringQueue = [];
return steering;
}
private dequeueFollowUpMessages(): AgentMessage[] {
if (this.followUpMode === "one-at-a-time") {
if (this.followUpQueue.length > 0) {
const first = this.followUpQueue[0];
this.followUpQueue = this.followUpQueue.slice(1);
return [first];
}
return [];
}
const followUp = this.followUpQueue.slice();
this.followUpQueue = [];
return followUp;
}
clearMessages() {
this._state.messages = [];
}
@ -310,7 +344,9 @@ export class Agent {
await this._runLoop(msgs);
}
/** Continue from current context (for retry after overflow) */
/**
* Continue from current context (used for retries and resuming queued messages).
*/
async continue() {
if (this._state.isStreaming) {
throw new Error("Agent is already processing. Wait for completion before continuing.");
@ -321,6 +357,18 @@ export class Agent {
throw new Error("No messages to continue from");
}
if (messages[messages.length - 1].role === "assistant") {
const queuedSteering = this.dequeueSteeringMessages();
if (queuedSteering.length > 0) {
await this._runLoop(queuedSteering, { skipInitialSteeringPoll: true });
return;
}
const queuedFollowUp = this.dequeueFollowUpMessages();
if (queuedFollowUp.length > 0) {
await this._runLoop(queuedFollowUp);
return;
}
throw new Error("Cannot continue from message role: assistant");
}
@ -332,7 +380,7 @@ export class Agent {
* If messages are provided, starts a new conversation turn with those messages.
* Otherwise, continues from existing context.
*/
private async _runLoop(messages?: AgentMessage[]) {
private async _runLoop(messages?: AgentMessage[], options?: { skipInitialSteeringPoll?: boolean }) {
const model = this._state.model;
if (!model) throw new Error("No model configured");
@ -353,6 +401,8 @@ export class Agent {
tools: this._state.tools,
};
let skipInitialSteeringPoll = options?.skipInitialSteeringPoll === true;
const config: AgentLoopConfig = {
model,
reasoning,
@ -363,33 +413,13 @@ export class Agent {
transformContext: this.transformContext,
getApiKey: this.getApiKey,
getSteeringMessages: async () => {
if (this.steeringMode === "one-at-a-time") {
if (this.steeringQueue.length > 0) {
const first = this.steeringQueue[0];
this.steeringQueue = this.steeringQueue.slice(1);
return [first];
}
if (skipInitialSteeringPoll) {
skipInitialSteeringPoll = false;
return [];
} else {
const steering = this.steeringQueue.slice();
this.steeringQueue = [];
return steering;
}
},
getFollowUpMessages: async () => {
if (this.followUpMode === "one-at-a-time") {
if (this.followUpQueue.length > 0) {
const first = this.followUpQueue[0];
this.followUpQueue = this.followUpQueue.slice(1);
return [first];
}
return [];
} else {
const followUp = this.followUpQueue.slice();
this.followUpQueue = [];
return followUp;
}
return this.dequeueSteeringMessages();
},
getFollowUpMessages: async () => this.dequeueFollowUpMessages(),
};
let partial: AgentMessage | null = null;

View file

@ -230,6 +230,88 @@ describe("Agent", () => {
await firstPrompt.catch(() => {});
});
it("continue() should process queued follow-up messages after an assistant turn", async () => {
const agent = new Agent({
streamFn: () => {
const stream = new MockAssistantStream();
queueMicrotask(() => {
stream.push({ type: "done", reason: "stop", message: createAssistantMessage("Processed") });
});
return stream;
},
});
agent.replaceMessages([
{
role: "user",
content: [{ type: "text", text: "Initial" }],
timestamp: Date.now() - 10,
},
createAssistantMessage("Initial response"),
]);
agent.followUp({
role: "user",
content: [{ type: "text", text: "Queued follow-up" }],
timestamp: Date.now(),
});
await expect(agent.continue()).resolves.toBeUndefined();
const hasQueuedFollowUp = agent.state.messages.some((message) => {
if (message.role !== "user") return false;
if (typeof message.content === "string") return message.content === "Queued follow-up";
return message.content.some((part) => part.type === "text" && part.text === "Queued follow-up");
});
expect(hasQueuedFollowUp).toBe(true);
expect(agent.state.messages[agent.state.messages.length - 1].role).toBe("assistant");
});
it("continue() should keep one-at-a-time steering semantics from assistant tail", async () => {
let responseCount = 0;
const agent = new Agent({
streamFn: () => {
const stream = new MockAssistantStream();
responseCount++;
queueMicrotask(() => {
stream.push({
type: "done",
reason: "stop",
message: createAssistantMessage(`Processed ${responseCount}`),
});
});
return stream;
},
});
agent.replaceMessages([
{
role: "user",
content: [{ type: "text", text: "Initial" }],
timestamp: Date.now() - 10,
},
createAssistantMessage("Initial response"),
]);
agent.steer({
role: "user",
content: [{ type: "text", text: "Steering 1" }],
timestamp: Date.now(),
});
agent.steer({
role: "user",
content: [{ type: "text", text: "Steering 2" }],
timestamp: Date.now() + 1,
});
await expect(agent.continue()).resolves.toBeUndefined();
const recentMessages = agent.state.messages.slice(-4);
expect(recentMessages.map((m) => m.role)).toEqual(["user", "assistant", "user", "assistant"]);
expect(responseCount).toBe(2);
});
it("forwards sessionId to streamFn options", async () => {
let receivedSessionId: string | undefined;
const agent = new Agent({

View file

@ -5,6 +5,7 @@
### Fixed
- Fixed extra spacing between thinking-only assistant content and subsequent tool execution blocks when assistant messages contain no text
- Fixed queued steering/follow-up/custom messages remaining stuck after threshold auto-compaction by resuming the agent loop when Agent-level queues still contain pending messages ([#1312](https://github.com/badlogic/pi-mono/pull/1312) by [@ferologics](https://github.com/ferologics))
## [0.52.6] - 2026-02-05

View file

@ -1671,6 +1671,12 @@ export class AgentSession {
this.agent.replaceMessages(messages.slice(0, -1));
}
setTimeout(() => {
this.agent.continue().catch(() => {});
}, 100);
} else if (this.agent.hasQueuedMessages()) {
// Auto-compaction can complete while follow-up/steering/custom messages are waiting.
// Kick the loop so queued messages are actually delivered.
setTimeout(() => {
this.agent.continue().catch(() => {});
}, 100);

View file

@ -0,0 +1,97 @@
import { existsSync, mkdirSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { Agent } from "@mariozechner/pi-agent-core";
import { getModel } from "@mariozechner/pi-ai";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { AgentSession } from "../src/core/agent-session.js";
import { AuthStorage } from "../src/core/auth-storage.js";
import { ModelRegistry } from "../src/core/model-registry.js";
import { SessionManager } from "../src/core/session-manager.js";
import { SettingsManager } from "../src/core/settings-manager.js";
import { createTestResourceLoader } from "./utilities.js";
vi.mock("../src/core/compaction/index.js", () => ({
calculateContextTokens: () => 0,
collectEntriesForBranchSummary: () => ({ entries: [], commonAncestorId: null }),
compact: async () => ({
summary: "compacted",
firstKeptEntryId: "entry-1",
tokensBefore: 100,
details: {},
}),
estimateContextTokens: () => ({ tokens: 0, usageTokens: 0, trailingTokens: 0, lastUsageIndex: -1 }),
generateBranchSummary: async () => ({ summary: "", aborted: false, readFiles: [], modifiedFiles: [] }),
prepareCompaction: () => ({ dummy: true }),
shouldCompact: () => false,
}));
describe("AgentSession auto-compaction queue resume", () => {
let session: AgentSession;
let tempDir: string;
beforeEach(() => {
tempDir = join(tmpdir(), `pi-auto-compaction-queue-${Date.now()}`);
mkdirSync(tempDir, { recursive: true });
vi.useFakeTimers();
const model = getModel("anthropic", "claude-sonnet-4-5")!;
const agent = new Agent({
initialState: {
model,
systemPrompt: "Test",
tools: [],
},
});
const sessionManager = SessionManager.inMemory();
const settingsManager = SettingsManager.create(tempDir, tempDir);
const authStorage = new AuthStorage(join(tempDir, "auth.json"));
authStorage.setRuntimeApiKey("anthropic", "test-key");
const modelRegistry = new ModelRegistry(authStorage, tempDir);
session = new AgentSession({
agent,
sessionManager,
settingsManager,
cwd: tempDir,
modelRegistry,
resourceLoader: createTestResourceLoader(),
});
});
afterEach(() => {
session.dispose();
vi.useRealTimers();
vi.restoreAllMocks();
if (tempDir && existsSync(tempDir)) {
rmSync(tempDir, { recursive: true });
}
});
it("should resume after threshold compaction when only agent-level queued messages exist", async () => {
session.agent.followUp({
role: "custom",
customType: "test",
content: [{ type: "text", text: "Queued custom" }],
display: false,
timestamp: Date.now(),
});
expect(session.pendingMessageCount).toBe(0);
expect(session.agent.hasQueuedMessages()).toBe(true);
const continueSpy = vi.spyOn(session.agent, "continue").mockResolvedValue();
const runAutoCompaction = (
session as unknown as {
_runAutoCompaction: (reason: "overflow" | "threshold", willRetry: boolean) => Promise<void>;
}
)._runAutoCompaction.bind(session);
await runAutoCompaction("threshold", false);
await vi.advanceTimersByTimeAsync(100);
expect(continueSpy).toHaveBeenCalledTimes(1);
});
});