diff --git a/packages/coding-agent/src/core/agent-session.ts b/packages/coding-agent/src/core/agent-session.ts index 06601e3e..bf9984ec 100644 --- a/packages/coding-agent/src/core/agent-session.ts +++ b/packages/coding-agent/src/core/agent-session.ts @@ -138,8 +138,10 @@ export class AgentSession { private _unsubscribeAgent?: () => void; private _eventListeners: AgentSessionEventListener[] = []; - // Message queue state - private _queuedMessages: string[] = []; + /** Tracks pending steering messages for UI display. Removed when delivered. */ + private _steeringMessages: string[] = []; + /** Tracks pending follow-up messages for UI display. Removed when delivered. */ + private _followUpMessages: string[] = []; // Compaction state private _compactionAbortController: AbortController | undefined = undefined; @@ -207,16 +209,21 @@ export class AgentSession { /** Internal handler for agent events - shared by subscribe and reconnect */ private _handleAgentEvent = async (event: AgentEvent): Promise => { - // When a user message starts, check if it's from the queue and remove it BEFORE emitting + // When a user message starts, check if it's from either queue and remove it BEFORE emitting // This ensures the UI sees the updated queue state - if (event.type === "message_start" && event.message.role === "user" && this._queuedMessages.length > 0) { - // Extract text content from the message + if (event.type === "message_start" && event.message.role === "user") { const messageText = this._getUserMessageText(event.message); - if (messageText && this._queuedMessages.includes(messageText)) { - // Remove the first occurrence of this message from the queue - const index = this._queuedMessages.indexOf(messageText); - if (index !== -1) { - this._queuedMessages.splice(index, 1); + if (messageText) { + // Check steering queue first + const steeringIndex = this._steeringMessages.indexOf(messageText); + if (steeringIndex !== -1) { + this._steeringMessages.splice(steeringIndex, 1); + } else { + // Check follow-up queue + const followUpIndex = this._followUpMessages.indexOf(messageText); + if (followUpIndex !== -1) { + this._followUpMessages.splice(followUpIndex, 1); + } } } } @@ -418,9 +425,14 @@ export class AgentSession { return this.agent.state.messages; } - /** Current queue mode */ - get queueMode(): "all" | "one-at-a-time" { - return this.agent.getQueueMode(); + /** Current steering mode */ + get steeringMode(): "all" | "one-at-a-time" { + return this.agent.getSteeringMode(); + } + + /** Current follow-up mode */ + get followUpMode(): "all" | "one-at-a-time" { + return this.agent.getFollowUpMode(); } /** Current session file path, or undefined if sessions are disabled */ @@ -456,7 +468,7 @@ export class AgentSession { */ async prompt(text: string, options?: PromptOptions): Promise { if (this.isStreaming) { - throw new Error("Agent is already processing. Use queueMessage() to queue messages during streaming."); + throw new Error("Agent is already processing. Use steer() or followUp() to queue messages during streaming."); } // Flush any pending bash messages before the new prompt @@ -565,12 +577,25 @@ export class AgentSession { } /** - * Queue a message to be sent after the current response completes. - * Use when agent is currently streaming. + * Queue a steering message to interrupt the agent mid-run. + * Delivered after current tool execution, skips remaining tools. */ - async queueMessage(text: string): Promise { - this._queuedMessages.push(text); - await this.agent.queueMessage({ + async steer(text: string): Promise { + this._steeringMessages.push(text); + this.agent.steer({ + role: "user", + content: [{ type: "text", text }], + timestamp: Date.now(), + }); + } + + /** + * Queue a follow-up message to be processed after the agent finishes. + * Delivered only when agent has no more tool calls or steering messages. + */ + async followUp(text: string): Promise { + this._followUpMessages.push(text); + this.agent.followUp({ role: "user", content: [{ type: "text", text }], timestamp: Date.now(), @@ -586,11 +611,12 @@ export class AgentSession { * - Not streaming + no trigger: appends to state/session, no turn * * @param message Hook message with customType, content, display, details - * @param triggerTurn If true and not streaming, triggers a new LLM turn + * @param options.triggerTurn If true and not streaming, triggers a new LLM turn + * @param options.deliverAs When streaming, use "steer" (default) for immediate or "followUp" to wait */ async sendHookMessage( message: Pick, "customType" | "content" | "display" | "details">, - triggerTurn?: boolean, + options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" }, ): Promise { const appMessage = { role: "hookMessage" as const, @@ -602,8 +628,12 @@ export class AgentSession { } satisfies HookMessage; if (this.isStreaming) { // Queue for processing by agent loop - await this.agent.queueMessage(appMessage); - } else if (triggerTurn) { + if (options?.deliverAs === "followUp") { + this.agent.followUp(appMessage); + } else { + this.agent.steer(appMessage); + } + } else if (options?.triggerTurn) { // Send as prompt - agent loop will emit message events await this.agent.prompt(appMessage); } else { @@ -619,24 +649,32 @@ export class AgentSession { } /** - * Clear queued messages and return them. + * Clear all queued messages and return them. * Useful for restoring to editor when user aborts. + * @returns Object with steering and followUp arrays */ - clearQueue(): string[] { - const queued = [...this._queuedMessages]; - this._queuedMessages = []; - this.agent.clearMessageQueue(); - return queued; + clearQueue(): { steering: string[]; followUp: string[] } { + const steering = [...this._steeringMessages]; + const followUp = [...this._followUpMessages]; + this._steeringMessages = []; + this._followUpMessages = []; + this.agent.clearAllQueues(); + return { steering, followUp }; } - /** Number of messages currently queued */ - get queuedMessageCount(): number { - return this._queuedMessages.length; + /** Number of pending messages (includes both steering and follow-up) */ + get pendingMessageCount(): number { + return this._steeringMessages.length + this._followUpMessages.length; } - /** Get queued messages (read-only) */ - getQueuedMessages(): readonly string[] { - return this._queuedMessages; + /** Get pending steering messages (read-only) */ + getSteeringMessages(): readonly string[] { + return this._steeringMessages; + } + + /** Get pending follow-up messages (read-only) */ + getFollowUpMessages(): readonly string[] { + return this._followUpMessages; } get skillsSettings(): Required | undefined { @@ -678,7 +716,8 @@ export class AgentSession { await this.abort(); this.agent.reset(); this.sessionManager.newSession(options); - this._queuedMessages = []; + this._steeringMessages = []; + this._followUpMessages = []; this._reconnectToAgent(); // Emit session_switch event with reason "new" to hooks @@ -856,12 +895,21 @@ export class AgentSession { // ========================================================================= /** - * Set message queue mode. + * Set steering message mode. * Saves to settings. */ - setQueueMode(mode: "all" | "one-at-a-time"): void { - this.agent.setQueueMode(mode); - this.settingsManager.setQueueMode(mode); + setSteeringMode(mode: "all" | "one-at-a-time"): void { + this.agent.setSteeringMode(mode); + this.settingsManager.setSteeringMode(mode); + } + + /** + * Set follow-up message mode. + * Saves to settings. + */ + setFollowUpMode(mode: "all" | "one-at-a-time"): void { + this.agent.setFollowUpMode(mode); + this.settingsManager.setFollowUpMode(mode); } // ========================================================================= @@ -1450,7 +1498,8 @@ export class AgentSession { this._disconnectFromAgent(); await this.abort(); - this._queuedMessages = []; + this._steeringMessages = []; + this._followUpMessages = []; // Set new session this.sessionManager.setSessionFile(sessionPath); @@ -1882,7 +1931,7 @@ export class AgentSession { modelRegistry: this._modelRegistry, model: this.agent.state.model, isIdle: () => !this.isStreaming, - hasQueuedMessages: () => this.queuedMessageCount > 0, + hasPendingMessages: () => this.pendingMessageCount > 0, abort: () => { this.abort(); }, diff --git a/packages/coding-agent/src/core/custom-tools/types.ts b/packages/coding-agent/src/core/custom-tools/types.ts index 59d800f9..df6093bb 100644 --- a/packages/coding-agent/src/core/custom-tools/types.ts +++ b/packages/coding-agent/src/core/custom-tools/types.ts @@ -50,7 +50,7 @@ export interface CustomToolContext { /** Whether the agent is idle (not streaming) */ isIdle(): boolean; /** Whether there are queued messages waiting to be processed */ - hasQueuedMessages(): boolean; + hasPendingMessages(): boolean; /** Abort the current agent operation (fire-and-forget, does not wait) */ abort(): void; } diff --git a/packages/coding-agent/src/core/hooks/runner.ts b/packages/coding-agent/src/core/hooks/runner.ts index 56bb53a2..65dfe3f7 100644 --- a/packages/coding-agent/src/core/hooks/runner.ts +++ b/packages/coding-agent/src/core/hooks/runner.ts @@ -73,7 +73,7 @@ export class HookRunner { private isIdleFn: () => boolean = () => true; private waitForIdleFn: () => Promise = async () => {}; private abortFn: () => void = () => {}; - private hasQueuedMessagesFn: () => boolean = () => false; + private hasPendingMessagesFn: () => boolean = () => false; private newSessionHandler: NewSessionHandler = async () => ({ cancelled: false }); private branchHandler: BranchHandler = async () => ({ cancelled: false }); private navigateTreeHandler: NavigateTreeHandler = async () => ({ cancelled: false }); @@ -111,7 +111,7 @@ export class HookRunner { /** Function to abort current operation (fire-and-forget) */ abort?: () => void; /** Function to check if there are queued messages */ - hasQueuedMessages?: () => boolean; + hasPendingMessages?: () => boolean; /** UI context for interactive prompts */ uiContext?: HookUIContext; /** Whether UI is available */ @@ -121,7 +121,7 @@ export class HookRunner { this.isIdleFn = options.isIdle ?? (() => true); this.waitForIdleFn = options.waitForIdle ?? (async () => {}); this.abortFn = options.abort ?? (() => {}); - this.hasQueuedMessagesFn = options.hasQueuedMessages ?? (() => false); + this.hasPendingMessagesFn = options.hasPendingMessages ?? (() => false); // Store session handlers for HookCommandContext if (options.newSessionHandler) { this.newSessionHandler = options.newSessionHandler; @@ -250,7 +250,7 @@ export class HookRunner { model: this.getModel(), isIdle: () => this.isIdleFn(), abort: () => this.abortFn(), - hasQueuedMessages: () => this.hasQueuedMessagesFn(), + hasPendingMessages: () => this.hasPendingMessagesFn(), }; } diff --git a/packages/coding-agent/src/core/hooks/types.ts b/packages/coding-agent/src/core/hooks/types.ts index e2317d2d..f6503a34 100644 --- a/packages/coding-agent/src/core/hooks/types.ts +++ b/packages/coding-agent/src/core/hooks/types.ts @@ -161,7 +161,7 @@ export interface HookContext { /** Abort the current agent operation (fire-and-forget, does not wait) */ abort(): void; /** Whether there are queued messages waiting to be processed */ - hasQueuedMessages(): boolean; + hasPendingMessages(): boolean; } /** diff --git a/packages/coding-agent/src/core/sdk.ts b/packages/coding-agent/src/core/sdk.ts index 7f945bcc..9545fb39 100644 --- a/packages/coding-agent/src/core/sdk.ts +++ b/packages/coding-agent/src/core/sdk.ts @@ -575,7 +575,7 @@ export async function createAgentSession(options: CreateAgentSessionOptions = {} modelRegistry, model: agent.state.model, isIdle: () => !session.isStreaming, - hasQueuedMessages: () => session.queuedMessageCount > 0, + hasPendingMessages: () => session.pendingMessageCount > 0, abort: () => { session.abort(); }, diff --git a/packages/coding-agent/src/modes/interactive/interactive-mode.ts b/packages/coding-agent/src/modes/interactive/interactive-mode.ts index d9a21d70..9dcd8d6a 100644 --- a/packages/coding-agent/src/modes/interactive/interactive-mode.ts +++ b/packages/coding-agent/src/modes/interactive/interactive-mode.ts @@ -404,7 +404,7 @@ export class InteractiveMode { sendMessageHandler: (message, triggerTurn) => { const wasStreaming = this.session.isStreaming; this.session - .sendHookMessage(message, triggerTurn) + .sendHookMessage(message, { triggerTurn }) .then(() => { // For non-streaming cases with display=true, update UI // (streaming cases update via message_end event) @@ -486,7 +486,7 @@ export class InteractiveMode { abort: () => { this.session.abort(); }, - hasQueuedMessages: () => this.session.queuedMessageCount > 0, + hasPendingMessages: () => this.session.pendingMessageCount > 0, uiContext, hasUI: true, }); @@ -522,7 +522,7 @@ export class InteractiveMode { modelRegistry: this.session.modelRegistry, model: this.session.model, isIdle: () => !this.session.isStreaming, - hasQueuedMessages: () => this.session.queuedMessageCount > 0, + hasPendingMessages: () => this.session.pendingMessageCount > 0, abort: () => { this.session.abort(); }, @@ -737,8 +737,9 @@ export class InteractiveMode { this.editor.onEscape = () => { if (this.loadingAnimation) { // Abort and restore queued messages to editor - const queuedMessages = this.session.clearQueue(); - const queuedText = queuedMessages.join("\n\n"); + const { steering, followUp } = this.session.clearQueue(); + const allQueued = [...steering, ...followUp]; + const queuedText = allQueued.join("\n\n"); const currentText = this.editor.getText(); const combinedText = [queuedText, currentText].filter((t) => t.trim()).join("\n\n"); this.editor.setText(combinedText); @@ -1599,12 +1600,17 @@ export class InteractiveMode { private updatePendingMessagesDisplay(): void { this.pendingMessagesContainer.clear(); - const queuedMessages = this.session.getQueuedMessages(); - if (queuedMessages.length > 0) { + const steeringMessages = this.session.getSteeringMessages(); + const followUpMessages = this.session.getFollowUpMessages(); + if (steeringMessages.length > 0 || followUpMessages.length > 0) { this.pendingMessagesContainer.addChild(new Spacer(1)); - for (const message of queuedMessages) { - const queuedText = theme.fg("dim", `Queued: ${message}`); - this.pendingMessagesContainer.addChild(new TruncatedText(queuedText, 1, 0)); + for (const message of steeringMessages) { + const text = theme.fg("dim", `Steering: ${message}`); + this.pendingMessagesContainer.addChild(new TruncatedText(text, 1, 0)); + } + for (const message of followUpMessages) { + const text = theme.fg("dim", `Follow-up: ${message}`); + this.pendingMessagesContainer.addChild(new TruncatedText(text, 1, 0)); } } } diff --git a/packages/coding-agent/src/modes/print-mode.ts b/packages/coding-agent/src/modes/print-mode.ts index 04ff4f36..e2b51544 100644 --- a/packages/coding-agent/src/modes/print-mode.ts +++ b/packages/coding-agent/src/modes/print-mode.ts @@ -33,7 +33,7 @@ export async function runPrintMode( hookRunner.initialize({ getModel: () => session.model, sendMessageHandler: (message, triggerTurn) => { - session.sendHookMessage(message, triggerTurn).catch((e) => { + session.sendHookMessage(message, { triggerTurn }).catch((e) => { console.error(`Hook sendMessage failed: ${e instanceof Error ? e.message : String(e)}`); }); }, @@ -64,7 +64,7 @@ export async function runPrintMode( modelRegistry: session.modelRegistry, model: session.model, isIdle: () => !session.isStreaming, - hasQueuedMessages: () => session.queuedMessageCount > 0, + hasPendingMessages: () => session.pendingMessageCount > 0, abort: () => { session.abort(); }, diff --git a/packages/coding-agent/src/modes/rpc/rpc-mode.ts b/packages/coding-agent/src/modes/rpc/rpc-mode.ts index 84135753..090f4d5b 100644 --- a/packages/coding-agent/src/modes/rpc/rpc-mode.ts +++ b/packages/coding-agent/src/modes/rpc/rpc-mode.ts @@ -182,7 +182,7 @@ export async function runRpcMode(session: AgentSession): Promise { hookRunner.initialize({ getModel: () => session.agent.state.model, sendMessageHandler: (message, triggerTurn) => { - session.sendHookMessage(message, triggerTurn).catch((e) => { + session.sendHookMessage(message, { triggerTurn }).catch((e) => { output(error(undefined, "hook_send", e.message)); }); }, @@ -216,7 +216,7 @@ export async function runRpcMode(session: AgentSession): Promise { modelRegistry: session.modelRegistry, model: session.model, isIdle: () => !session.isStreaming, - hasQueuedMessages: () => session.queuedMessageCount > 0, + hasPendingMessages: () => session.pendingMessageCount > 0, abort: () => { session.abort(); }, @@ -284,7 +284,7 @@ export async function runRpcMode(session: AgentSession): Promise { sessionId: session.sessionId, autoCompactionEnabled: session.autoCompactionEnabled, messageCount: session.messages.length, - queuedMessageCount: session.queuedMessageCount, + pendingMessageCount: session.pendingMessageCount, }; return success(id, "get_state", state); } diff --git a/packages/coding-agent/src/modes/rpc/rpc-types.ts b/packages/coding-agent/src/modes/rpc/rpc-types.ts index a58b86c9..b40fa9a7 100644 --- a/packages/coding-agent/src/modes/rpc/rpc-types.ts +++ b/packages/coding-agent/src/modes/rpc/rpc-types.ts @@ -74,7 +74,7 @@ export interface RpcSessionState { sessionId: string; autoCompactionEnabled: boolean; messageCount: number; - queuedMessageCount: number; + pendingMessageCount: number; } // ============================================================================ diff --git a/packages/coding-agent/test/agent-session-concurrent.test.ts b/packages/coding-agent/test/agent-session-concurrent.test.ts index 75c68c2e..30f3692d 100644 --- a/packages/coding-agent/test/agent-session-concurrent.test.ts +++ b/packages/coding-agent/test/agent-session-concurrent.test.ts @@ -144,7 +144,7 @@ describe("AgentSession concurrent prompt guard", () => { // queueMessage should work while streaming expect(() => session.queueMessage("Queued message")).not.toThrow(); - expect(session.queuedMessageCount).toBe(1); + expect(session.pendingMessageCount).toBe(1); // Cleanup await session.abort();