From b050c582a1e578fee39712cc8d05168d4ce911be Mon Sep 17 00:00:00 2001 From: Mario Zechner Date: Fri, 6 Feb 2026 11:30:43 +0100 Subject: [PATCH] fix(agent,coding-agent): resume queued messages after auto-compaction --- AGENTS.md | 2 +- packages/agent/CHANGELOG.md | 4 + packages/agent/src/agent.ts | 82 +++++++++++----- packages/agent/test/agent.test.ts | 82 ++++++++++++++++ packages/coding-agent/CHANGELOG.md | 1 + .../coding-agent/src/core/agent-session.ts | 6 ++ ...gent-session-auto-compaction-queue.test.ts | 97 +++++++++++++++++++ 7 files changed, 247 insertions(+), 27 deletions(-) create mode 100644 packages/coding-agent/test/agent-session-auto-compaction-queue.test.ts diff --git a/AGENTS.md b/AGENTS.md index f16e830d..336665fa 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -23,7 +23,7 @@ read README.md, then ask which module(s) to work on. Based on the answer, read t - After code changes (not documentation changes): `npm run check` (get full output, no tail). Fix all errors, warnings, and infos before committing. - Note: `npm run check` does not run tests. - NEVER run: `npm run dev`, `npm run build`, `npm test` -- Only run specific tests if user instructs: `npm test -- test/specific.test.ts` +- Only run specific tests if user instructs: `npx tsx ./node_modules/vitest/vitest.mjs --run test/specific.test.ts` - Run tests from the package root, not the repo root. - When writing tests, run them, identify issues in either the test or implementation, and iterate until fixed. - NEVER commit unless user asks diff --git a/packages/agent/CHANGELOG.md b/packages/agent/CHANGELOG.md index 15c24779..92a089c4 100644 --- a/packages/agent/CHANGELOG.md +++ b/packages/agent/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Fixed + +- Fixed `continue()` to resume queued steering/follow-up messages when context currently ends in an assistant message, and preserved one-at-a-time steering ordering during assistant-tail resumes ([#1312](https://github.com/badlogic/pi-mono/pull/1312) by [@ferologics](https://github.com/ferologics)) + ## [0.52.6] - 2026-02-05 ## [0.52.5] - 2026-02-05 diff --git a/packages/agent/src/agent.ts b/packages/agent/src/agent.ts index e99630da..1f310281 100644 --- a/packages/agent/src/agent.ts +++ b/packages/agent/src/agent.ts @@ -252,6 +252,40 @@ export class Agent { this.followUpQueue = []; } + hasQueuedMessages(): boolean { + return this.steeringQueue.length > 0 || this.followUpQueue.length > 0; + } + + private dequeueSteeringMessages(): AgentMessage[] { + if (this.steeringMode === "one-at-a-time") { + if (this.steeringQueue.length > 0) { + const first = this.steeringQueue[0]; + this.steeringQueue = this.steeringQueue.slice(1); + return [first]; + } + return []; + } + + const steering = this.steeringQueue.slice(); + this.steeringQueue = []; + return steering; + } + + private dequeueFollowUpMessages(): AgentMessage[] { + if (this.followUpMode === "one-at-a-time") { + if (this.followUpQueue.length > 0) { + const first = this.followUpQueue[0]; + this.followUpQueue = this.followUpQueue.slice(1); + return [first]; + } + return []; + } + + const followUp = this.followUpQueue.slice(); + this.followUpQueue = []; + return followUp; + } + clearMessages() { this._state.messages = []; } @@ -310,7 +344,9 @@ export class Agent { await this._runLoop(msgs); } - /** Continue from current context (for retry after overflow) */ + /** + * Continue from current context (used for retries and resuming queued messages). + */ async continue() { if (this._state.isStreaming) { throw new Error("Agent is already processing. Wait for completion before continuing."); @@ -321,6 +357,18 @@ export class Agent { throw new Error("No messages to continue from"); } if (messages[messages.length - 1].role === "assistant") { + const queuedSteering = this.dequeueSteeringMessages(); + if (queuedSteering.length > 0) { + await this._runLoop(queuedSteering, { skipInitialSteeringPoll: true }); + return; + } + + const queuedFollowUp = this.dequeueFollowUpMessages(); + if (queuedFollowUp.length > 0) { + await this._runLoop(queuedFollowUp); + return; + } + throw new Error("Cannot continue from message role: assistant"); } @@ -332,7 +380,7 @@ export class Agent { * If messages are provided, starts a new conversation turn with those messages. * Otherwise, continues from existing context. */ - private async _runLoop(messages?: AgentMessage[]) { + private async _runLoop(messages?: AgentMessage[], options?: { skipInitialSteeringPoll?: boolean }) { const model = this._state.model; if (!model) throw new Error("No model configured"); @@ -353,6 +401,8 @@ export class Agent { tools: this._state.tools, }; + let skipInitialSteeringPoll = options?.skipInitialSteeringPoll === true; + const config: AgentLoopConfig = { model, reasoning, @@ -363,33 +413,13 @@ export class Agent { transformContext: this.transformContext, getApiKey: this.getApiKey, getSteeringMessages: async () => { - if (this.steeringMode === "one-at-a-time") { - if (this.steeringQueue.length > 0) { - const first = this.steeringQueue[0]; - this.steeringQueue = this.steeringQueue.slice(1); - return [first]; - } + if (skipInitialSteeringPoll) { + skipInitialSteeringPoll = false; return []; - } else { - const steering = this.steeringQueue.slice(); - this.steeringQueue = []; - return steering; - } - }, - getFollowUpMessages: async () => { - if (this.followUpMode === "one-at-a-time") { - if (this.followUpQueue.length > 0) { - const first = this.followUpQueue[0]; - this.followUpQueue = this.followUpQueue.slice(1); - return [first]; - } - return []; - } else { - const followUp = this.followUpQueue.slice(); - this.followUpQueue = []; - return followUp; } + return this.dequeueSteeringMessages(); }, + getFollowUpMessages: async () => this.dequeueFollowUpMessages(), }; let partial: AgentMessage | null = null; diff --git a/packages/agent/test/agent.test.ts b/packages/agent/test/agent.test.ts index 44df9117..3b476344 100644 --- a/packages/agent/test/agent.test.ts +++ b/packages/agent/test/agent.test.ts @@ -230,6 +230,88 @@ describe("Agent", () => { await firstPrompt.catch(() => {}); }); + it("continue() should process queued follow-up messages after an assistant turn", async () => { + const agent = new Agent({ + streamFn: () => { + const stream = new MockAssistantStream(); + queueMicrotask(() => { + stream.push({ type: "done", reason: "stop", message: createAssistantMessage("Processed") }); + }); + return stream; + }, + }); + + agent.replaceMessages([ + { + role: "user", + content: [{ type: "text", text: "Initial" }], + timestamp: Date.now() - 10, + }, + createAssistantMessage("Initial response"), + ]); + + agent.followUp({ + role: "user", + content: [{ type: "text", text: "Queued follow-up" }], + timestamp: Date.now(), + }); + + await expect(agent.continue()).resolves.toBeUndefined(); + + const hasQueuedFollowUp = agent.state.messages.some((message) => { + if (message.role !== "user") return false; + if (typeof message.content === "string") return message.content === "Queued follow-up"; + return message.content.some((part) => part.type === "text" && part.text === "Queued follow-up"); + }); + + expect(hasQueuedFollowUp).toBe(true); + expect(agent.state.messages[agent.state.messages.length - 1].role).toBe("assistant"); + }); + + it("continue() should keep one-at-a-time steering semantics from assistant tail", async () => { + let responseCount = 0; + const agent = new Agent({ + streamFn: () => { + const stream = new MockAssistantStream(); + responseCount++; + queueMicrotask(() => { + stream.push({ + type: "done", + reason: "stop", + message: createAssistantMessage(`Processed ${responseCount}`), + }); + }); + return stream; + }, + }); + + agent.replaceMessages([ + { + role: "user", + content: [{ type: "text", text: "Initial" }], + timestamp: Date.now() - 10, + }, + createAssistantMessage("Initial response"), + ]); + + agent.steer({ + role: "user", + content: [{ type: "text", text: "Steering 1" }], + timestamp: Date.now(), + }); + agent.steer({ + role: "user", + content: [{ type: "text", text: "Steering 2" }], + timestamp: Date.now() + 1, + }); + + await expect(agent.continue()).resolves.toBeUndefined(); + + const recentMessages = agent.state.messages.slice(-4); + expect(recentMessages.map((m) => m.role)).toEqual(["user", "assistant", "user", "assistant"]); + expect(responseCount).toBe(2); + }); + it("forwards sessionId to streamFn options", async () => { let receivedSessionId: string | undefined; const agent = new Agent({ diff --git a/packages/coding-agent/CHANGELOG.md b/packages/coding-agent/CHANGELOG.md index 8c38409a..5474e943 100644 --- a/packages/coding-agent/CHANGELOG.md +++ b/packages/coding-agent/CHANGELOG.md @@ -5,6 +5,7 @@ ### Fixed - Fixed extra spacing between thinking-only assistant content and subsequent tool execution blocks when assistant messages contain no text +- Fixed queued steering/follow-up/custom messages remaining stuck after threshold auto-compaction by resuming the agent loop when Agent-level queues still contain pending messages ([#1312](https://github.com/badlogic/pi-mono/pull/1312) by [@ferologics](https://github.com/ferologics)) ## [0.52.6] - 2026-02-05 diff --git a/packages/coding-agent/src/core/agent-session.ts b/packages/coding-agent/src/core/agent-session.ts index 20e5dadf..cdc6abba 100644 --- a/packages/coding-agent/src/core/agent-session.ts +++ b/packages/coding-agent/src/core/agent-session.ts @@ -1671,6 +1671,12 @@ export class AgentSession { this.agent.replaceMessages(messages.slice(0, -1)); } + setTimeout(() => { + this.agent.continue().catch(() => {}); + }, 100); + } else if (this.agent.hasQueuedMessages()) { + // Auto-compaction can complete while follow-up/steering/custom messages are waiting. + // Kick the loop so queued messages are actually delivered. setTimeout(() => { this.agent.continue().catch(() => {}); }, 100); diff --git a/packages/coding-agent/test/agent-session-auto-compaction-queue.test.ts b/packages/coding-agent/test/agent-session-auto-compaction-queue.test.ts new file mode 100644 index 00000000..0c99307f --- /dev/null +++ b/packages/coding-agent/test/agent-session-auto-compaction-queue.test.ts @@ -0,0 +1,97 @@ +import { existsSync, mkdirSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { Agent } from "@mariozechner/pi-agent-core"; +import { getModel } from "@mariozechner/pi-ai"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { AgentSession } from "../src/core/agent-session.js"; +import { AuthStorage } from "../src/core/auth-storage.js"; +import { ModelRegistry } from "../src/core/model-registry.js"; +import { SessionManager } from "../src/core/session-manager.js"; +import { SettingsManager } from "../src/core/settings-manager.js"; +import { createTestResourceLoader } from "./utilities.js"; + +vi.mock("../src/core/compaction/index.js", () => ({ + calculateContextTokens: () => 0, + collectEntriesForBranchSummary: () => ({ entries: [], commonAncestorId: null }), + compact: async () => ({ + summary: "compacted", + firstKeptEntryId: "entry-1", + tokensBefore: 100, + details: {}, + }), + estimateContextTokens: () => ({ tokens: 0, usageTokens: 0, trailingTokens: 0, lastUsageIndex: -1 }), + generateBranchSummary: async () => ({ summary: "", aborted: false, readFiles: [], modifiedFiles: [] }), + prepareCompaction: () => ({ dummy: true }), + shouldCompact: () => false, +})); + +describe("AgentSession auto-compaction queue resume", () => { + let session: AgentSession; + let tempDir: string; + + beforeEach(() => { + tempDir = join(tmpdir(), `pi-auto-compaction-queue-${Date.now()}`); + mkdirSync(tempDir, { recursive: true }); + vi.useFakeTimers(); + + const model = getModel("anthropic", "claude-sonnet-4-5")!; + const agent = new Agent({ + initialState: { + model, + systemPrompt: "Test", + tools: [], + }, + }); + + const sessionManager = SessionManager.inMemory(); + const settingsManager = SettingsManager.create(tempDir, tempDir); + const authStorage = new AuthStorage(join(tempDir, "auth.json")); + authStorage.setRuntimeApiKey("anthropic", "test-key"); + const modelRegistry = new ModelRegistry(authStorage, tempDir); + + session = new AgentSession({ + agent, + sessionManager, + settingsManager, + cwd: tempDir, + modelRegistry, + resourceLoader: createTestResourceLoader(), + }); + }); + + afterEach(() => { + session.dispose(); + vi.useRealTimers(); + vi.restoreAllMocks(); + if (tempDir && existsSync(tempDir)) { + rmSync(tempDir, { recursive: true }); + } + }); + + it("should resume after threshold compaction when only agent-level queued messages exist", async () => { + session.agent.followUp({ + role: "custom", + customType: "test", + content: [{ type: "text", text: "Queued custom" }], + display: false, + timestamp: Date.now(), + }); + + expect(session.pendingMessageCount).toBe(0); + expect(session.agent.hasQueuedMessages()).toBe(true); + + const continueSpy = vi.spyOn(session.agent, "continue").mockResolvedValue(); + + const runAutoCompaction = ( + session as unknown as { + _runAutoCompaction: (reason: "overflow" | "threshold", willRetry: boolean) => Promise; + } + )._runAutoCompaction.bind(session); + + await runAutoCompaction("threshold", false); + await vi.advanceTimersByTimeAsync(100); + + expect(continueSpy).toHaveBeenCalledTimes(1); + }); +});