feat(coding-agent): update AgentSession for steer()/followUp() API

- Rename queueMessage to steer(), add followUp()
- Split _pendingMessages into _steeringMessages and _followUpMessages
- Update sendHookMessage to accept deliverAs option
- Rename hasQueuedMessages to hasPendingMessages
- Rename queuedMessageCount to pendingMessageCount
- Update clearQueue() return type to { steering, followUp }
- Update UI to show steering vs follow-up messages differently

WIP: settings-manager, sdk, interactive-mode, rpc-mode still need updates
This commit is contained in:
Mario Zechner 2026-01-02 23:24:27 +01:00
parent a980998464
commit 58c423ba36
10 changed files with 121 additions and 66 deletions

View file

@ -138,8 +138,10 @@ export class AgentSession {
private _unsubscribeAgent?: () => void; private _unsubscribeAgent?: () => void;
private _eventListeners: AgentSessionEventListener[] = []; private _eventListeners: AgentSessionEventListener[] = [];
// Message queue state /** Tracks pending steering messages for UI display. Removed when delivered. */
private _queuedMessages: string[] = []; private _steeringMessages: string[] = [];
/** Tracks pending follow-up messages for UI display. Removed when delivered. */
private _followUpMessages: string[] = [];
// Compaction state // Compaction state
private _compactionAbortController: AbortController | undefined = undefined; private _compactionAbortController: AbortController | undefined = undefined;
@ -207,16 +209,21 @@ export class AgentSession {
/** Internal handler for agent events - shared by subscribe and reconnect */ /** Internal handler for agent events - shared by subscribe and reconnect */
private _handleAgentEvent = async (event: AgentEvent): Promise<void> => { private _handleAgentEvent = async (event: AgentEvent): Promise<void> => {
// When a user message starts, check if it's from the queue and remove it BEFORE emitting // When a user message starts, check if it's from either queue and remove it BEFORE emitting
// This ensures the UI sees the updated queue state // This ensures the UI sees the updated queue state
if (event.type === "message_start" && event.message.role === "user" && this._queuedMessages.length > 0) { if (event.type === "message_start" && event.message.role === "user") {
// Extract text content from the message
const messageText = this._getUserMessageText(event.message); const messageText = this._getUserMessageText(event.message);
if (messageText && this._queuedMessages.includes(messageText)) { if (messageText) {
// Remove the first occurrence of this message from the queue // Check steering queue first
const index = this._queuedMessages.indexOf(messageText); const steeringIndex = this._steeringMessages.indexOf(messageText);
if (index !== -1) { if (steeringIndex !== -1) {
this._queuedMessages.splice(index, 1); this._steeringMessages.splice(steeringIndex, 1);
} else {
// Check follow-up queue
const followUpIndex = this._followUpMessages.indexOf(messageText);
if (followUpIndex !== -1) {
this._followUpMessages.splice(followUpIndex, 1);
}
} }
} }
} }
@ -418,9 +425,14 @@ export class AgentSession {
return this.agent.state.messages; return this.agent.state.messages;
} }
/** Current queue mode */ /** Current steering mode */
get queueMode(): "all" | "one-at-a-time" { get steeringMode(): "all" | "one-at-a-time" {
return this.agent.getQueueMode(); return this.agent.getSteeringMode();
}
/** Current follow-up mode */
get followUpMode(): "all" | "one-at-a-time" {
return this.agent.getFollowUpMode();
} }
/** Current session file path, or undefined if sessions are disabled */ /** Current session file path, or undefined if sessions are disabled */
@ -456,7 +468,7 @@ export class AgentSession {
*/ */
async prompt(text: string, options?: PromptOptions): Promise<void> { async prompt(text: string, options?: PromptOptions): Promise<void> {
if (this.isStreaming) { if (this.isStreaming) {
throw new Error("Agent is already processing. Use queueMessage() to queue messages during streaming."); throw new Error("Agent is already processing. Use steer() or followUp() to queue messages during streaming.");
} }
// Flush any pending bash messages before the new prompt // Flush any pending bash messages before the new prompt
@ -565,12 +577,25 @@ export class AgentSession {
} }
/** /**
* Queue a message to be sent after the current response completes. * Queue a steering message to interrupt the agent mid-run.
* Use when agent is currently streaming. * Delivered after current tool execution, skips remaining tools.
*/ */
async queueMessage(text: string): Promise<void> { async steer(text: string): Promise<void> {
this._queuedMessages.push(text); this._steeringMessages.push(text);
await this.agent.queueMessage({ this.agent.steer({
role: "user",
content: [{ type: "text", text }],
timestamp: Date.now(),
});
}
/**
* Queue a follow-up message to be processed after the agent finishes.
* Delivered only when agent has no more tool calls or steering messages.
*/
async followUp(text: string): Promise<void> {
this._followUpMessages.push(text);
this.agent.followUp({
role: "user", role: "user",
content: [{ type: "text", text }], content: [{ type: "text", text }],
timestamp: Date.now(), timestamp: Date.now(),
@ -586,11 +611,12 @@ export class AgentSession {
* - Not streaming + no trigger: appends to state/session, no turn * - Not streaming + no trigger: appends to state/session, no turn
* *
* @param message Hook message with customType, content, display, details * @param message Hook message with customType, content, display, details
* @param triggerTurn If true and not streaming, triggers a new LLM turn * @param options.triggerTurn If true and not streaming, triggers a new LLM turn
* @param options.deliverAs When streaming, use "steer" (default) for immediate or "followUp" to wait
*/ */
async sendHookMessage<T = unknown>( async sendHookMessage<T = unknown>(
message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">, message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">,
triggerTurn?: boolean, options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" },
): Promise<void> { ): Promise<void> {
const appMessage = { const appMessage = {
role: "hookMessage" as const, role: "hookMessage" as const,
@ -602,8 +628,12 @@ export class AgentSession {
} satisfies HookMessage<T>; } satisfies HookMessage<T>;
if (this.isStreaming) { if (this.isStreaming) {
// Queue for processing by agent loop // Queue for processing by agent loop
await this.agent.queueMessage(appMessage); if (options?.deliverAs === "followUp") {
} else if (triggerTurn) { this.agent.followUp(appMessage);
} else {
this.agent.steer(appMessage);
}
} else if (options?.triggerTurn) {
// Send as prompt - agent loop will emit message events // Send as prompt - agent loop will emit message events
await this.agent.prompt(appMessage); await this.agent.prompt(appMessage);
} else { } else {
@ -619,24 +649,32 @@ export class AgentSession {
} }
/** /**
* Clear queued messages and return them. * Clear all queued messages and return them.
* Useful for restoring to editor when user aborts. * Useful for restoring to editor when user aborts.
* @returns Object with steering and followUp arrays
*/ */
clearQueue(): string[] { clearQueue(): { steering: string[]; followUp: string[] } {
const queued = [...this._queuedMessages]; const steering = [...this._steeringMessages];
this._queuedMessages = []; const followUp = [...this._followUpMessages];
this.agent.clearMessageQueue(); this._steeringMessages = [];
return queued; this._followUpMessages = [];
this.agent.clearAllQueues();
return { steering, followUp };
} }
/** Number of messages currently queued */ /** Number of pending messages (includes both steering and follow-up) */
get queuedMessageCount(): number { get pendingMessageCount(): number {
return this._queuedMessages.length; return this._steeringMessages.length + this._followUpMessages.length;
} }
/** Get queued messages (read-only) */ /** Get pending steering messages (read-only) */
getQueuedMessages(): readonly string[] { getSteeringMessages(): readonly string[] {
return this._queuedMessages; return this._steeringMessages;
}
/** Get pending follow-up messages (read-only) */
getFollowUpMessages(): readonly string[] {
return this._followUpMessages;
} }
get skillsSettings(): Required<SkillsSettings> | undefined { get skillsSettings(): Required<SkillsSettings> | undefined {
@ -678,7 +716,8 @@ export class AgentSession {
await this.abort(); await this.abort();
this.agent.reset(); this.agent.reset();
this.sessionManager.newSession(options); this.sessionManager.newSession(options);
this._queuedMessages = []; this._steeringMessages = [];
this._followUpMessages = [];
this._reconnectToAgent(); this._reconnectToAgent();
// Emit session_switch event with reason "new" to hooks // Emit session_switch event with reason "new" to hooks
@ -856,12 +895,21 @@ export class AgentSession {
// ========================================================================= // =========================================================================
/** /**
* Set message queue mode. * Set steering message mode.
* Saves to settings. * Saves to settings.
*/ */
setQueueMode(mode: "all" | "one-at-a-time"): void { setSteeringMode(mode: "all" | "one-at-a-time"): void {
this.agent.setQueueMode(mode); this.agent.setSteeringMode(mode);
this.settingsManager.setQueueMode(mode); this.settingsManager.setSteeringMode(mode);
}
/**
* Set follow-up message mode.
* Saves to settings.
*/
setFollowUpMode(mode: "all" | "one-at-a-time"): void {
this.agent.setFollowUpMode(mode);
this.settingsManager.setFollowUpMode(mode);
} }
// ========================================================================= // =========================================================================
@ -1450,7 +1498,8 @@ export class AgentSession {
this._disconnectFromAgent(); this._disconnectFromAgent();
await this.abort(); await this.abort();
this._queuedMessages = []; this._steeringMessages = [];
this._followUpMessages = [];
// Set new session // Set new session
this.sessionManager.setSessionFile(sessionPath); this.sessionManager.setSessionFile(sessionPath);
@ -1882,7 +1931,7 @@ export class AgentSession {
modelRegistry: this._modelRegistry, modelRegistry: this._modelRegistry,
model: this.agent.state.model, model: this.agent.state.model,
isIdle: () => !this.isStreaming, isIdle: () => !this.isStreaming,
hasQueuedMessages: () => this.queuedMessageCount > 0, hasPendingMessages: () => this.pendingMessageCount > 0,
abort: () => { abort: () => {
this.abort(); this.abort();
}, },

View file

@ -50,7 +50,7 @@ export interface CustomToolContext {
/** Whether the agent is idle (not streaming) */ /** Whether the agent is idle (not streaming) */
isIdle(): boolean; isIdle(): boolean;
/** Whether there are queued messages waiting to be processed */ /** Whether there are queued messages waiting to be processed */
hasQueuedMessages(): boolean; hasPendingMessages(): boolean;
/** Abort the current agent operation (fire-and-forget, does not wait) */ /** Abort the current agent operation (fire-and-forget, does not wait) */
abort(): void; abort(): void;
} }

View file

@ -73,7 +73,7 @@ export class HookRunner {
private isIdleFn: () => boolean = () => true; private isIdleFn: () => boolean = () => true;
private waitForIdleFn: () => Promise<void> = async () => {}; private waitForIdleFn: () => Promise<void> = async () => {};
private abortFn: () => void = () => {}; private abortFn: () => void = () => {};
private hasQueuedMessagesFn: () => boolean = () => false; private hasPendingMessagesFn: () => boolean = () => false;
private newSessionHandler: NewSessionHandler = async () => ({ cancelled: false }); private newSessionHandler: NewSessionHandler = async () => ({ cancelled: false });
private branchHandler: BranchHandler = async () => ({ cancelled: false }); private branchHandler: BranchHandler = async () => ({ cancelled: false });
private navigateTreeHandler: NavigateTreeHandler = async () => ({ cancelled: false }); private navigateTreeHandler: NavigateTreeHandler = async () => ({ cancelled: false });
@ -111,7 +111,7 @@ export class HookRunner {
/** Function to abort current operation (fire-and-forget) */ /** Function to abort current operation (fire-and-forget) */
abort?: () => void; abort?: () => void;
/** Function to check if there are queued messages */ /** Function to check if there are queued messages */
hasQueuedMessages?: () => boolean; hasPendingMessages?: () => boolean;
/** UI context for interactive prompts */ /** UI context for interactive prompts */
uiContext?: HookUIContext; uiContext?: HookUIContext;
/** Whether UI is available */ /** Whether UI is available */
@ -121,7 +121,7 @@ export class HookRunner {
this.isIdleFn = options.isIdle ?? (() => true); this.isIdleFn = options.isIdle ?? (() => true);
this.waitForIdleFn = options.waitForIdle ?? (async () => {}); this.waitForIdleFn = options.waitForIdle ?? (async () => {});
this.abortFn = options.abort ?? (() => {}); this.abortFn = options.abort ?? (() => {});
this.hasQueuedMessagesFn = options.hasQueuedMessages ?? (() => false); this.hasPendingMessagesFn = options.hasPendingMessages ?? (() => false);
// Store session handlers for HookCommandContext // Store session handlers for HookCommandContext
if (options.newSessionHandler) { if (options.newSessionHandler) {
this.newSessionHandler = options.newSessionHandler; this.newSessionHandler = options.newSessionHandler;
@ -250,7 +250,7 @@ export class HookRunner {
model: this.getModel(), model: this.getModel(),
isIdle: () => this.isIdleFn(), isIdle: () => this.isIdleFn(),
abort: () => this.abortFn(), abort: () => this.abortFn(),
hasQueuedMessages: () => this.hasQueuedMessagesFn(), hasPendingMessages: () => this.hasPendingMessagesFn(),
}; };
} }

View file

@ -161,7 +161,7 @@ export interface HookContext {
/** Abort the current agent operation (fire-and-forget, does not wait) */ /** Abort the current agent operation (fire-and-forget, does not wait) */
abort(): void; abort(): void;
/** Whether there are queued messages waiting to be processed */ /** Whether there are queued messages waiting to be processed */
hasQueuedMessages(): boolean; hasPendingMessages(): boolean;
} }
/** /**

View file

@ -575,7 +575,7 @@ export async function createAgentSession(options: CreateAgentSessionOptions = {}
modelRegistry, modelRegistry,
model: agent.state.model, model: agent.state.model,
isIdle: () => !session.isStreaming, isIdle: () => !session.isStreaming,
hasQueuedMessages: () => session.queuedMessageCount > 0, hasPendingMessages: () => session.pendingMessageCount > 0,
abort: () => { abort: () => {
session.abort(); session.abort();
}, },

View file

@ -404,7 +404,7 @@ export class InteractiveMode {
sendMessageHandler: (message, triggerTurn) => { sendMessageHandler: (message, triggerTurn) => {
const wasStreaming = this.session.isStreaming; const wasStreaming = this.session.isStreaming;
this.session this.session
.sendHookMessage(message, triggerTurn) .sendHookMessage(message, { triggerTurn })
.then(() => { .then(() => {
// For non-streaming cases with display=true, update UI // For non-streaming cases with display=true, update UI
// (streaming cases update via message_end event) // (streaming cases update via message_end event)
@ -486,7 +486,7 @@ export class InteractiveMode {
abort: () => { abort: () => {
this.session.abort(); this.session.abort();
}, },
hasQueuedMessages: () => this.session.queuedMessageCount > 0, hasPendingMessages: () => this.session.pendingMessageCount > 0,
uiContext, uiContext,
hasUI: true, hasUI: true,
}); });
@ -522,7 +522,7 @@ export class InteractiveMode {
modelRegistry: this.session.modelRegistry, modelRegistry: this.session.modelRegistry,
model: this.session.model, model: this.session.model,
isIdle: () => !this.session.isStreaming, isIdle: () => !this.session.isStreaming,
hasQueuedMessages: () => this.session.queuedMessageCount > 0, hasPendingMessages: () => this.session.pendingMessageCount > 0,
abort: () => { abort: () => {
this.session.abort(); this.session.abort();
}, },
@ -737,8 +737,9 @@ export class InteractiveMode {
this.editor.onEscape = () => { this.editor.onEscape = () => {
if (this.loadingAnimation) { if (this.loadingAnimation) {
// Abort and restore queued messages to editor // Abort and restore queued messages to editor
const queuedMessages = this.session.clearQueue(); const { steering, followUp } = this.session.clearQueue();
const queuedText = queuedMessages.join("\n\n"); const allQueued = [...steering, ...followUp];
const queuedText = allQueued.join("\n\n");
const currentText = this.editor.getText(); const currentText = this.editor.getText();
const combinedText = [queuedText, currentText].filter((t) => t.trim()).join("\n\n"); const combinedText = [queuedText, currentText].filter((t) => t.trim()).join("\n\n");
this.editor.setText(combinedText); this.editor.setText(combinedText);
@ -1599,12 +1600,17 @@ export class InteractiveMode {
private updatePendingMessagesDisplay(): void { private updatePendingMessagesDisplay(): void {
this.pendingMessagesContainer.clear(); this.pendingMessagesContainer.clear();
const queuedMessages = this.session.getQueuedMessages(); const steeringMessages = this.session.getSteeringMessages();
if (queuedMessages.length > 0) { const followUpMessages = this.session.getFollowUpMessages();
if (steeringMessages.length > 0 || followUpMessages.length > 0) {
this.pendingMessagesContainer.addChild(new Spacer(1)); this.pendingMessagesContainer.addChild(new Spacer(1));
for (const message of queuedMessages) { for (const message of steeringMessages) {
const queuedText = theme.fg("dim", `Queued: ${message}`); const text = theme.fg("dim", `Steering: ${message}`);
this.pendingMessagesContainer.addChild(new TruncatedText(queuedText, 1, 0)); this.pendingMessagesContainer.addChild(new TruncatedText(text, 1, 0));
}
for (const message of followUpMessages) {
const text = theme.fg("dim", `Follow-up: ${message}`);
this.pendingMessagesContainer.addChild(new TruncatedText(text, 1, 0));
} }
} }
} }

View file

@ -33,7 +33,7 @@ export async function runPrintMode(
hookRunner.initialize({ hookRunner.initialize({
getModel: () => session.model, getModel: () => session.model,
sendMessageHandler: (message, triggerTurn) => { sendMessageHandler: (message, triggerTurn) => {
session.sendHookMessage(message, triggerTurn).catch((e) => { session.sendHookMessage(message, { triggerTurn }).catch((e) => {
console.error(`Hook sendMessage failed: ${e instanceof Error ? e.message : String(e)}`); console.error(`Hook sendMessage failed: ${e instanceof Error ? e.message : String(e)}`);
}); });
}, },
@ -64,7 +64,7 @@ export async function runPrintMode(
modelRegistry: session.modelRegistry, modelRegistry: session.modelRegistry,
model: session.model, model: session.model,
isIdle: () => !session.isStreaming, isIdle: () => !session.isStreaming,
hasQueuedMessages: () => session.queuedMessageCount > 0, hasPendingMessages: () => session.pendingMessageCount > 0,
abort: () => { abort: () => {
session.abort(); session.abort();
}, },

View file

@ -182,7 +182,7 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
hookRunner.initialize({ hookRunner.initialize({
getModel: () => session.agent.state.model, getModel: () => session.agent.state.model,
sendMessageHandler: (message, triggerTurn) => { sendMessageHandler: (message, triggerTurn) => {
session.sendHookMessage(message, triggerTurn).catch((e) => { session.sendHookMessage(message, { triggerTurn }).catch((e) => {
output(error(undefined, "hook_send", e.message)); output(error(undefined, "hook_send", e.message));
}); });
}, },
@ -216,7 +216,7 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
modelRegistry: session.modelRegistry, modelRegistry: session.modelRegistry,
model: session.model, model: session.model,
isIdle: () => !session.isStreaming, isIdle: () => !session.isStreaming,
hasQueuedMessages: () => session.queuedMessageCount > 0, hasPendingMessages: () => session.pendingMessageCount > 0,
abort: () => { abort: () => {
session.abort(); session.abort();
}, },
@ -284,7 +284,7 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
sessionId: session.sessionId, sessionId: session.sessionId,
autoCompactionEnabled: session.autoCompactionEnabled, autoCompactionEnabled: session.autoCompactionEnabled,
messageCount: session.messages.length, messageCount: session.messages.length,
queuedMessageCount: session.queuedMessageCount, pendingMessageCount: session.pendingMessageCount,
}; };
return success(id, "get_state", state); return success(id, "get_state", state);
} }

View file

@ -74,7 +74,7 @@ export interface RpcSessionState {
sessionId: string; sessionId: string;
autoCompactionEnabled: boolean; autoCompactionEnabled: boolean;
messageCount: number; messageCount: number;
queuedMessageCount: number; pendingMessageCount: number;
} }
// ============================================================================ // ============================================================================

View file

@ -144,7 +144,7 @@ describe("AgentSession concurrent prompt guard", () => {
// queueMessage should work while streaming // queueMessage should work while streaming
expect(() => session.queueMessage("Queued message")).not.toThrow(); expect(() => session.queueMessage("Queued message")).not.toThrow();
expect(session.queuedMessageCount).toBe(1); expect(session.pendingMessageCount).toBe(1);
// Cleanup // Cleanup
await session.abort(); await session.abort();