feat: split queue API into steer() and followUp() (#403)

This commit is contained in:
Mario Zechner 2026-01-03 00:25:07 +01:00
commit 93498737c0
35 changed files with 733 additions and 300 deletions

4
.pi/settings.json Normal file
View file

@ -0,0 +1,4 @@
{
"customTools": ["packages/coding-agent/examples/custom-tools/todo/index.ts"],
"hooks": ["packages/coding-agent/examples/hooks/todo/index.ts"]
}

View file

@ -81,5 +81,6 @@ Use these sections under `## [Unreleased]`:
The script handles: version bump, CHANGELOG finalization, commit, tag, publish, and adding new `[Unreleased]` sections.
### Tool Usage
**CTRICIAL**: NEVER use sed/cat to read a file or a range of a file. Always use the read tool (use offset + limit for ranged reads).
### **CRITICAL** Tool Usage Rules **CRITICAL**
- NEVER use sed/cat to read a file or a range of a file. Always use the read tool (use offset + limit for ranged reads).
- You MUST read every file you modify in full before editing.

View file

@ -2,9 +2,21 @@
## [Unreleased]
### Breaking Changes
- **Queue API replaced with steer/followUp**: The `queueMessage()` method has been split into two methods with different delivery semantics ([#403](https://github.com/badlogic/pi-mono/issues/403)):
- `steer(msg)`: Interrupts the agent mid-run. Delivered after current tool execution, skips remaining tools.
- `followUp(msg)`: Waits until the agent finishes. Delivered only when there are no more tool calls or steering messages.
- **Queue mode renamed**: `queueMode` option renamed to `steeringMode`. Added new `followUpMode` option. Both control whether messages are delivered one-at-a-time or all at once.
- **AgentLoopConfig callbacks renamed**: `getQueuedMessages` split into `getSteeringMessages` and `getFollowUpMessages`.
- **Agent methods renamed**:
- `queueMessage()``steer()` and `followUp()`
- `clearMessageQueue()``clearSteeringQueue()`, `clearFollowUpQueue()`, `clearAllQueues()`
- `setQueueMode()`/`getQueueMode()``setSteeringMode()`/`getSteeringMode()` and `setFollowUpMode()`/`getFollowUpMode()`
### Fixed
- `prompt()` and `continue()` now throw if called while the agent is already streaming, preventing race conditions and corrupted state. Use `queueMessage()` to queue messages during streaming, or `await` the previous call.
- `prompt()` and `continue()` now throw if called while the agent is already streaming, preventing race conditions and corrupted state. Use `steer()` or `followUp()` to queue messages during streaming, or `await` the previous call.
## [0.31.1] - 2026-01-02

View file

@ -109,71 +109,88 @@ async function runLoop(
stream: EventStream<AgentEvent, AgentMessage[]>,
streamFn?: StreamFn,
): Promise<void> {
let hasMoreToolCalls = true;
let firstTurn = true;
let queuedMessages: AgentMessage[] = (await config.getQueuedMessages?.()) || [];
let queuedAfterTools: AgentMessage[] | null = null;
// Check for steering messages at start (user may have typed while waiting)
let pendingMessages: AgentMessage[] = (await config.getSteeringMessages?.()) || [];
while (hasMoreToolCalls || queuedMessages.length > 0) {
if (!firstTurn) {
stream.push({ type: "turn_start" });
} else {
firstTurn = false;
}
// Outer loop: continues when queued follow-up messages arrive after agent would stop
while (true) {
let hasMoreToolCalls = true;
let steeringAfterTools: AgentMessage[] | null = null;
// Process queued messages (inject before next assistant response)
if (queuedMessages.length > 0) {
for (const message of queuedMessages) {
stream.push({ type: "message_start", message });
stream.push({ type: "message_end", message });
currentContext.messages.push(message);
newMessages.push(message);
// Inner loop: process tool calls and steering messages
while (hasMoreToolCalls || pendingMessages.length > 0) {
if (!firstTurn) {
stream.push({ type: "turn_start" });
} else {
firstTurn = false;
}
queuedMessages = [];
}
// Stream assistant response
const message = await streamAssistantResponse(currentContext, config, signal, stream, streamFn);
newMessages.push(message);
// Process pending messages (inject before next assistant response)
if (pendingMessages.length > 0) {
for (const message of pendingMessages) {
stream.push({ type: "message_start", message });
stream.push({ type: "message_end", message });
currentContext.messages.push(message);
newMessages.push(message);
}
pendingMessages = [];
}
if (message.stopReason === "error" || message.stopReason === "aborted") {
stream.push({ type: "turn_end", message, toolResults: [] });
stream.push({ type: "agent_end", messages: newMessages });
stream.end(newMessages);
return;
}
// Stream assistant response
const message = await streamAssistantResponse(currentContext, config, signal, stream, streamFn);
newMessages.push(message);
// Check for tool calls
const toolCalls = message.content.filter((c) => c.type === "toolCall");
hasMoreToolCalls = toolCalls.length > 0;
if (message.stopReason === "error" || message.stopReason === "aborted") {
stream.push({ type: "turn_end", message, toolResults: [] });
stream.push({ type: "agent_end", messages: newMessages });
stream.end(newMessages);
return;
}
const toolResults: ToolResultMessage[] = [];
if (hasMoreToolCalls) {
const toolExecution = await executeToolCalls(
currentContext.tools,
message,
signal,
stream,
config.getQueuedMessages,
);
toolResults.push(...toolExecution.toolResults);
queuedAfterTools = toolExecution.queuedMessages ?? null;
// Check for tool calls
const toolCalls = message.content.filter((c) => c.type === "toolCall");
hasMoreToolCalls = toolCalls.length > 0;
for (const result of toolResults) {
currentContext.messages.push(result);
newMessages.push(result);
const toolResults: ToolResultMessage[] = [];
if (hasMoreToolCalls) {
const toolExecution = await executeToolCalls(
currentContext.tools,
message,
signal,
stream,
config.getSteeringMessages,
);
toolResults.push(...toolExecution.toolResults);
steeringAfterTools = toolExecution.steeringMessages ?? null;
for (const result of toolResults) {
currentContext.messages.push(result);
newMessages.push(result);
}
}
stream.push({ type: "turn_end", message, toolResults });
// Get steering messages after turn completes
if (steeringAfterTools && steeringAfterTools.length > 0) {
pendingMessages = steeringAfterTools;
steeringAfterTools = null;
} else {
pendingMessages = (await config.getSteeringMessages?.()) || [];
}
}
stream.push({ type: "turn_end", message, toolResults });
// Get queued messages after turn completes
if (queuedAfterTools && queuedAfterTools.length > 0) {
queuedMessages = queuedAfterTools;
queuedAfterTools = null;
} else {
queuedMessages = (await config.getQueuedMessages?.()) || [];
// Agent would stop here. Check for follow-up messages.
const followUpMessages = (await config.getFollowUpMessages?.()) || [];
if (followUpMessages.length > 0) {
// Set as pending so inner loop processes them
pendingMessages = followUpMessages;
continue;
}
// No more messages, exit
break;
}
stream.push({ type: "agent_end", messages: newMessages });
@ -279,11 +296,11 @@ async function executeToolCalls(
assistantMessage: AssistantMessage,
signal: AbortSignal | undefined,
stream: EventStream<AgentEvent, AgentMessage[]>,
getQueuedMessages?: AgentLoopConfig["getQueuedMessages"],
): Promise<{ toolResults: ToolResultMessage[]; queuedMessages?: AgentMessage[] }> {
getSteeringMessages?: AgentLoopConfig["getSteeringMessages"],
): Promise<{ toolResults: ToolResultMessage[]; steeringMessages?: AgentMessage[] }> {
const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall");
const results: ToolResultMessage[] = [];
let queuedMessages: AgentMessage[] | undefined;
let steeringMessages: AgentMessage[] | undefined;
for (let index = 0; index < toolCalls.length; index++) {
const toolCall = toolCalls[index];
@ -343,11 +360,11 @@ async function executeToolCalls(
stream.push({ type: "message_start", message: toolResultMessage });
stream.push({ type: "message_end", message: toolResultMessage });
// Check for queued messages - skip remaining tools if user interrupted
if (getQueuedMessages) {
const queued = await getQueuedMessages();
if (queued.length > 0) {
queuedMessages = queued;
// Check for steering messages - skip remaining tools if user interrupted
if (getSteeringMessages) {
const steering = await getSteeringMessages();
if (steering.length > 0) {
steeringMessages = steering;
const remainingCalls = toolCalls.slice(index + 1);
for (const skipped of remainingCalls) {
results.push(skipToolCall(skipped, stream));
@ -357,7 +374,7 @@ async function executeToolCalls(
}
}
return { toolResults: results, queuedMessages };
return { toolResults: results, steeringMessages };
}
function skipToolCall(

View file

@ -47,9 +47,14 @@ export interface AgentOptions {
transformContext?: (messages: AgentMessage[], signal?: AbortSignal) => Promise<AgentMessage[]>;
/**
* Queue mode: "all" = send all queued messages at once, "one-at-a-time" = one per turn
* Steering mode: "all" = send all steering messages at once, "one-at-a-time" = one per turn
*/
queueMode?: "all" | "one-at-a-time";
steeringMode?: "all" | "one-at-a-time";
/**
* Follow-up mode: "all" = send all follow-up messages at once, "one-at-a-time" = one per turn
*/
followUpMode?: "all" | "one-at-a-time";
/**
* Custom stream function (for proxy backends, etc.). Default uses streamSimple.
@ -80,8 +85,10 @@ export class Agent {
private abortController?: AbortController;
private convertToLlm: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
private transformContext?: (messages: AgentMessage[], signal?: AbortSignal) => Promise<AgentMessage[]>;
private messageQueue: AgentMessage[] = [];
private queueMode: "all" | "one-at-a-time";
private steeringQueue: AgentMessage[] = [];
private followUpQueue: AgentMessage[] = [];
private steeringMode: "all" | "one-at-a-time";
private followUpMode: "all" | "one-at-a-time";
public streamFn: StreamFn;
public getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
private runningPrompt?: Promise<void>;
@ -91,7 +98,8 @@ export class Agent {
this._state = { ...this._state, ...opts.initialState };
this.convertToLlm = opts.convertToLlm || defaultConvertToLlm;
this.transformContext = opts.transformContext;
this.queueMode = opts.queueMode || "one-at-a-time";
this.steeringMode = opts.steeringMode || "one-at-a-time";
this.followUpMode = opts.followUpMode || "one-at-a-time";
this.streamFn = opts.streamFn || streamSimple;
this.getApiKey = opts.getApiKey;
}
@ -118,12 +126,20 @@ export class Agent {
this._state.thinkingLevel = l;
}
setQueueMode(mode: "all" | "one-at-a-time") {
this.queueMode = mode;
setSteeringMode(mode: "all" | "one-at-a-time") {
this.steeringMode = mode;
}
getQueueMode(): "all" | "one-at-a-time" {
return this.queueMode;
getSteeringMode(): "all" | "one-at-a-time" {
return this.steeringMode;
}
setFollowUpMode(mode: "all" | "one-at-a-time") {
this.followUpMode = mode;
}
getFollowUpMode(): "all" | "one-at-a-time" {
return this.followUpMode;
}
setTools(t: AgentTool<any>[]) {
@ -138,12 +154,33 @@ export class Agent {
this._state.messages = [...this._state.messages, m];
}
queueMessage(m: AgentMessage) {
this.messageQueue.push(m);
/**
* Queue a steering message to interrupt the agent mid-run.
* Delivered after current tool execution, skips remaining tools.
*/
steer(m: AgentMessage) {
this.steeringQueue.push(m);
}
clearMessageQueue() {
this.messageQueue = [];
/**
* Queue a follow-up message to be processed after the agent finishes.
* Delivered only when agent has no more tool calls or steering messages.
*/
followUp(m: AgentMessage) {
this.followUpQueue.push(m);
}
clearSteeringQueue() {
this.steeringQueue = [];
}
clearFollowUpQueue() {
this.followUpQueue = [];
}
clearAllQueues() {
this.steeringQueue = [];
this.followUpQueue = [];
}
clearMessages() {
@ -164,7 +201,8 @@ export class Agent {
this._state.streamMessage = null;
this._state.pendingToolCalls = new Set<string>();
this._state.error = undefined;
this.messageQueue = [];
this.steeringQueue = [];
this.followUpQueue = [];
}
/** Send a prompt with an AgentMessage */
@ -172,7 +210,9 @@ export class Agent {
async prompt(input: string, images?: ImageContent[]): Promise<void>;
async prompt(input: string | AgentMessage | AgentMessage[], images?: ImageContent[]) {
if (this._state.isStreaming) {
throw new Error("Agent is already processing a prompt. Use queueMessage() or wait for completion.");
throw new Error(
"Agent is already processing a prompt. Use steer() or followUp() to queue messages, or wait for completion.",
);
}
const model = this._state.model;
@ -255,18 +295,32 @@ export class Agent {
convertToLlm: this.convertToLlm,
transformContext: this.transformContext,
getApiKey: this.getApiKey,
getQueuedMessages: async () => {
if (this.queueMode === "one-at-a-time") {
if (this.messageQueue.length > 0) {
const first = this.messageQueue[0];
this.messageQueue = this.messageQueue.slice(1);
getSteeringMessages: async () => {
if (this.steeringMode === "one-at-a-time") {
if (this.steeringQueue.length > 0) {
const first = this.steeringQueue[0];
this.steeringQueue = this.steeringQueue.slice(1);
return [first];
}
return [];
} else {
const queued = this.messageQueue.slice();
this.messageQueue = [];
return queued;
const steering = this.steeringQueue.slice();
this.steeringQueue = [];
return steering;
}
},
getFollowUpMessages: async () => {
if (this.followUpMode === "one-at-a-time") {
if (this.followUpQueue.length > 0) {
const first = this.followUpQueue[0];
this.followUpQueue = this.followUpQueue.slice(1);
return [first];
}
return [];
} else {
const followUp = this.followUpQueue.slice();
this.followUpQueue = [];
return followUp;
}
},
};

View file

@ -75,12 +75,26 @@ export interface AgentLoopConfig extends SimpleStreamOptions {
getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
/**
* Returns queued messages to inject into the conversation.
* Returns steering messages to inject into the conversation mid-run.
*
* Called after each turn to check for user interruptions or injected messages.
* If messages are returned, they're added to the context before the next LLM call.
* Called after each tool execution to check for user interruptions.
* If messages are returned, remaining tool calls are skipped and
* these messages are added to the context before the next LLM call.
*
* Use this for "steering" the agent while it's working.
*/
getQueuedMessages?: () => Promise<AgentMessage[]>;
getSteeringMessages?: () => Promise<AgentMessage[]>;
/**
* Returns follow-up messages to process after the agent would otherwise stop.
*
* Called when the agent has no more tool calls and no steering messages.
* If messages are returned, they're added to the context and the agent
* continues with another turn.
*
* Use this for follow-up messages that should wait until the agent finishes.
*/
getFollowUpMessages?: () => Promise<AgentMessage[]>;
}
/**

View file

@ -340,8 +340,8 @@ describe("agentLoop with AgentMessage", () => {
const config: AgentLoopConfig = {
model: createModel(),
convertToLlm: identityConverter,
getQueuedMessages: async () => {
// Return queued message after first tool executes
getSteeringMessages: async () => {
// Return steering message after first tool executes
if (executed.length === 1 && !queuedDelivered) {
queuedDelivered = true;
return [queuedUserMessage];

View file

@ -127,11 +127,21 @@ describe("Agent", () => {
expect(agent.state.messages).toEqual([]);
});
it("should support message queueing", async () => {
it("should support steering message queue", async () => {
const agent = new Agent();
const message = { role: "user" as const, content: "Queued message", timestamp: Date.now() };
agent.queueMessage(message);
const message = { role: "user" as const, content: "Steering message", timestamp: Date.now() };
agent.steer(message);
// The message is queued but not yet in state.messages
expect(agent.state.messages).not.toContainEqual(message);
});
it("should support follow-up message queue", async () => {
const agent = new Agent();
const message = { role: "user" as const, content: "Follow-up message", timestamp: Date.now() };
agent.followUp(message);
// The message is queued but not yet in state.messages
expect(agent.state.messages).not.toContainEqual(message);
@ -176,7 +186,7 @@ describe("Agent", () => {
// Second prompt should reject
await expect(agent.prompt("Second message")).rejects.toThrow(
"Agent is already processing a prompt. Use queueMessage() or wait for completion.",
"Agent is already processing a prompt. Use steer() or followUp() to queue messages, or wait for completion.",
);
// Cleanup - abort to stop the stream

View file

@ -2,13 +2,37 @@
## [Unreleased]
### Breaking Changes
- **Queue API replaced with steer/followUp**: The `queueMessage()` method has been split into two methods with different delivery semantics ([#403](https://github.com/badlogic/pi-mono/issues/403)):
- `steer(text)`: Interrupts the agent mid-run (Enter while streaming). Delivered after current tool execution.
- `followUp(text)`: Waits until the agent finishes (Alt+Enter while streaming). Delivered only when agent stops.
- **Settings renamed**: `queueMode` setting renamed to `steeringMode`. Added new `followUpMode` setting. Old settings.json files are migrated automatically.
- **AgentSession methods renamed**:
- `queueMessage()``steer()` and `followUp()`
- `queueMode` getter → `steeringMode` and `followUpMode` getters
- `setQueueMode()``setSteeringMode()` and `setFollowUpMode()`
- `queuedMessageCount``pendingMessageCount`
- `getQueuedMessages()``getSteeringMessages()` and `getFollowUpMessages()`
- `clearQueue()` now returns `{ steering: string[], followUp: string[] }`
- `hasQueuedMessages()``hasPendingMessages()`
- **Hook API signature changed**: `pi.sendMessage()` second parameter changed from `triggerTurn?: boolean` to `options?: { triggerTurn?, deliverAs? }`. Use `deliverAs: "followUp"` for follow-up delivery. Affects both hooks and internal `sendHookMessage()` method.
- **RPC API changes**:
- `queue_message` command → `steer` and `follow_up` commands
- `set_queue_mode` command → `set_steering_mode` and `set_follow_up_mode` commands
- `RpcSessionState.queueMode``steeringMode` and `followUpMode`
- **Settings UI**: "Queue mode" setting split into "Steering mode" and "Follow-up mode"
### Added
- Alt+Enter keybind to queue follow-up messages while agent is streaming
- `Theme` and `ThemeColor` types now exported for hooks using `ctx.ui.custom()`
- Terminal window title now displays "pi - dirname" to identify which project session you're in ([#407](https://github.com/badlogic/pi-mono/pull/407) by [@kaofelix](https://github.com/kaofelix))
### Fixed
- `AgentSession.prompt()` now throws if called while the agent is already streaming, preventing race conditions. Use `queueMessage()` to queue messages during streaming.
- `AgentSession.prompt()` now throws if called while the agent is already streaming, preventing race conditions. Use `steer()` or `followUp()` to queue messages during streaming.
- Ctrl+C now works like Escape in selector components, so mashing Ctrl+C will eventually close the program ([#400](https://github.com/badlogic/pi-mono/pull/400) by [@mitsuhiko](https://github.com/mitsuhiko))
## [0.31.1] - 2026-01-02

View file

@ -188,7 +188,7 @@ The agent reads, writes, and edits files, and executes commands via bash.
| Command | Description |
|---------|-------------|
| `/settings` | Open settings menu (thinking, theme, queue mode, toggles) |
| `/settings` | Open settings menu (thinking, theme, message delivery modes, toggles) |
| `/model` | Switch models mid-session (fuzzy search, arrow keys, Enter to select) |
| `/export [file]` | Export session to self-contained HTML |
| `/share` | Upload session as secret GitHub gist, get shareable URL (requires `gh` CLI) |
@ -214,7 +214,11 @@ The agent reads, writes, and edits files, and executes commands via bash.
**Multi-line paste:** Pasted content is collapsed to `[paste #N <lines> lines]` but sent in full.
**Message queuing:** Submit messages while the agent is working. They queue and process based on queue mode (configurable via `/settings`). Press Escape to abort and restore queued messages to editor.
**Message queuing:** Submit messages while the agent is working:
- **Enter** queues a *steering* message, delivered after current tool execution (interrupts remaining tools)
- **Alt+Enter** queues a *follow-up* message, delivered only after the agent finishes all work
Both modes are configurable via `/settings`: "one-at-a-time" delivers messages one by one waiting for responses, "all" delivers all queued messages at once. Press Escape to abort and restore queued messages to editor.
### Keyboard Shortcuts
@ -499,7 +503,8 @@ Global `~/.pi/agent/settings.json` stores persistent preferences:
"defaultModel": "claude-sonnet-4-20250514",
"defaultThinkingLevel": "medium",
"enabledModels": ["anthropic/*", "*gpt*", "gemini-2.5-pro:high"],
"queueMode": "one-at-a-time",
"steeringMode": "one-at-a-time",
"followUpMode": "one-at-a-time",
"shellPath": "C:\\path\\to\\bash.exe",
"hideThinkingBlock": false,
"collapseChangelog": false,
@ -531,7 +536,8 @@ Global `~/.pi/agent/settings.json` stores persistent preferences:
| `defaultModel` | Default model ID | - |
| `defaultThinkingLevel` | Thinking level: `off`, `minimal`, `low`, `medium`, `high`, `xhigh` | - |
| `enabledModels` | Model patterns for cycling. Supports glob patterns (`github-copilot/*`, `*sonnet*`) and fuzzy matching. Same as `--models` CLI flag | - |
| `queueMode` | Message queue mode: `all` or `one-at-a-time` | `one-at-a-time` |
| `steeringMode` | Steering message delivery: `all` or `one-at-a-time` | `one-at-a-time` |
| `followUpMode` | Follow-up message delivery: `all` or `one-at-a-time` | `one-at-a-time` |
| `shellPath` | Custom bash path (Windows) | auto-detected |
| `hideThinkingBlock` | Hide thinking blocks in output (Ctrl+T to toggle) | `false` |
| `collapseChangelog` | Show condensed changelog after update | `false` |
@ -689,7 +695,13 @@ export default function (pi: HookAPI) {
**Sending messages from hooks:**
Use `pi.sendMessage(message, triggerTurn?)` to inject messages into the session. Messages are persisted as `CustomMessageEntry` and sent to the LLM. If the agent is streaming, the message is queued; otherwise a new agent loop starts if `triggerTurn` is true.
Use `pi.sendMessage(message, options?)` to inject messages into the session. Messages are persisted as `CustomMessageEntry` and sent to the LLM.
Options:
- `triggerTurn`: If true and agent is idle, starts a new agent turn. Default: false.
- `deliverAs`: When agent is streaming, controls delivery timing:
- `"steer"` (default): Delivered after current tool execution, interrupts remaining tools.
- `"followUp"`: Delivered only after agent finishes all work.
```typescript
import * as fs from "node:fs";

View file

@ -465,10 +465,12 @@ const result = await ctx.ui.custom((tui, theme, done) => {
doWork(loader.signal).then(done).catch(() => done(null));
return loader;
return loader; // Return the component directly, do NOT wrap in Box/Container
});
```
**Important:** Return your component directly from the callback. Do not wrap it in a `Box` or `Container`, as this breaks input handling.
Your component can:
- Implement `handleInput(data: string)` to receive keyboard input
- Implement `render(width: number): string[]` to render lines
@ -563,13 +565,13 @@ Abort the current agent operation (fire-and-forget, does not wait):
await ctx.abort();
```
### ctx.hasQueuedMessages()
### ctx.hasPendingMessages()
Check if there are messages queued (user typed while agent was streaming):
Check if there are messages pending (user typed while agent was streaming):
```typescript
if (ctx.hasQueuedMessages()) {
// Skip interactive prompt, let queued message take over
if (ctx.hasPendingMessages()) {
// Skip interactive prompt, let pending messages take over
return;
}
```
@ -636,7 +638,7 @@ const result = await ctx.navigateTree("entry-id-456", {
Subscribe to events. See [Events](#events) for all event types.
### pi.sendMessage(message, triggerTurn?)
### pi.sendMessage(message, options?)
Inject a message into the session. Creates a `CustomMessageEntry` that participates in the LLM context.
@ -646,12 +648,17 @@ pi.sendMessage({
content: "Message text", // string or (TextContent | ImageContent)[]
display: true, // Show in TUI
details: { ... }, // Optional metadata (not sent to LLM)
}, triggerTurn); // If true, triggers LLM response
}, {
triggerTurn: true, // If true and agent is idle, triggers LLM response
deliverAs: "steer", // "steer" (default) or "followUp" when agent is streaming
});
```
**Storage and timing:**
- The message is appended to the session file immediately as a `CustomMessageEntry`
- If the agent is currently streaming, the message is queued and appended after the current turn
- If the agent is currently streaming:
- `deliverAs: "steer"` (default): Delivered after current tool execution, interrupts remaining tools
- `deliverAs: "followUp"`: Delivered only after agent finishes all work
- If `triggerTurn` is true and the agent is idle, a new agent loop starts
**LLM context:**
@ -698,7 +705,7 @@ pi.registerCommand("stats", {
For long-running commands (e.g., LLM calls), use `ctx.ui.custom()` with a loader. See [examples/hooks/qna.ts](../examples/hooks/qna.ts).
To trigger LLM after command, call `pi.sendMessage(..., true)`.
To trigger LLM after command, call `pi.sendMessage(..., { triggerTurn: true })`.
### pi.registerMessageRenderer(customType, renderer)

View file

@ -13,6 +13,12 @@ Example hooks for intercepting tool calls, adding safety gates, and integrating
### [custom-tools/](custom-tools/)
Example custom tools that extend the agent's capabilities.
## Tool + Hook Combinations
Some examples are designed to work together:
- **todo/** - The [custom tool](custom-tools/todo/) lets the LLM manage a todo list, while the [hook](hooks/todo/) adds a `/todos` command for users to view todos at any time.
## Documentation
- [SDK Reference](sdk/README.md)

View file

@ -19,6 +19,8 @@ Full-featured example demonstrating:
- Proper branching support via details storage
- State management without external files
**Companion hook:** [hooks/todo/](../hooks/todo/) adds a `/todos` command for users to view the todo list.
### subagent/
Delegate tasks to specialized subagents with isolated context windows. Includes:
- `index.ts` - The custom tool (single, parallel, and chain modes)

View file

@ -28,6 +28,7 @@ cp permission-gate.ts ~/.pi/agent/hooks/
| `snake.ts` | Snake game with custom UI, keyboard handling, and session persistence |
| `status-line.ts` | Shows turn progress in footer via `ctx.ui.setStatus()` with themed colors |
| `handoff.ts` | Transfer context to a new focused session via `/handoff <goal>` |
| `todo/` | Adds `/todos` command to view todos managed by the [todo custom tool](../custom-tools/todo/) |
## Writing Hooks

View file

@ -25,7 +25,7 @@ export default function (pi: HookAPI) {
content: `External trigger: ${content}`,
display: true,
},
true, // triggerTurn - get LLM to respond
{ triggerTurn: true }, // triggerTurn - get LLM to respond
);
fs.writeFileSync(triggerFile, ""); // Clear after reading
}

View file

@ -0,0 +1,134 @@
/**
* Todo Hook - Companion to the todo custom tool
*
* Registers a /todos command that displays all todos on the current branch
* with a nice custom UI.
*/
import type { HookAPI, Theme } from "@mariozechner/pi-coding-agent";
import { isCtrlC, isEscape, truncateToWidth } from "@mariozechner/pi-tui";
interface Todo {
id: number;
text: string;
done: boolean;
}
interface TodoDetails {
action: "list" | "add" | "toggle" | "clear";
todos: Todo[];
nextId: number;
error?: string;
}
class TodoListComponent {
private todos: Todo[];
private theme: Theme;
private onClose: () => void;
private cachedWidth?: number;
private cachedLines?: string[];
constructor(todos: Todo[], theme: Theme, onClose: () => void) {
this.todos = todos;
this.theme = theme;
this.onClose = onClose;
}
handleInput(data: string): void {
if (isEscape(data) || isCtrlC(data)) {
this.onClose();
}
}
render(width: number): string[] {
if (this.cachedLines && this.cachedWidth === width) {
return this.cachedLines;
}
const lines: string[] = [];
const th = this.theme;
// Header
lines.push("");
const title = th.fg("accent", " Todos ");
const headerLine =
th.fg("borderMuted", "─".repeat(3)) + title + th.fg("borderMuted", "─".repeat(Math.max(0, width - 10)));
lines.push(truncateToWidth(headerLine, width));
lines.push("");
if (this.todos.length === 0) {
lines.push(truncateToWidth(` ${th.fg("dim", "No todos yet. Ask the agent to add some!")}`, width));
} else {
// Stats
const done = this.todos.filter((t) => t.done).length;
const total = this.todos.length;
const statsText = ` ${th.fg("muted", `${done}/${total} completed`)}`;
lines.push(truncateToWidth(statsText, width));
lines.push("");
// Todo items
for (const todo of this.todos) {
const check = todo.done ? th.fg("success", "✓") : th.fg("dim", "○");
const id = th.fg("accent", `#${todo.id}`);
const text = todo.done ? th.fg("dim", todo.text) : th.fg("text", todo.text);
const line = ` ${check} ${id} ${text}`;
lines.push(truncateToWidth(line, width));
}
}
lines.push("");
lines.push(truncateToWidth(` ${th.fg("dim", "Press Escape to close")}`, width));
lines.push("");
this.cachedWidth = width;
this.cachedLines = lines;
return lines;
}
invalidate(): void {
this.cachedWidth = undefined;
this.cachedLines = undefined;
}
}
export default function (pi: HookAPI) {
/**
* Reconstruct todos from session entries on the current branch.
*/
function getTodos(ctx: {
sessionManager: {
getBranch: () => Array<{ type: string; message?: { role?: string; toolName?: string; details?: unknown } }>;
};
}): Todo[] {
let todos: Todo[] = [];
for (const entry of ctx.sessionManager.getBranch()) {
if (entry.type !== "message") continue;
const msg = entry.message;
if (!msg || msg.role !== "toolResult" || msg.toolName !== "todo") continue;
const details = msg.details as TodoDetails | undefined;
if (details) {
todos = details.todos;
}
}
return todos;
}
pi.registerCommand("todos", {
description: "Show all todos on the current branch",
handler: async (_args, ctx) => {
if (!ctx.hasUI) {
ctx.ui.notify("/todos requires interactive mode", "error");
return;
}
const todos = getTodos(ctx);
await ctx.ui.custom<void>((_tui, theme, done) => {
return new TodoListComponent(todos, theme, () => done());
});
},
});
}

View file

@ -138,8 +138,10 @@ export class AgentSession {
private _unsubscribeAgent?: () => void;
private _eventListeners: AgentSessionEventListener[] = [];
// Message queue state
private _queuedMessages: string[] = [];
/** Tracks pending steering messages for UI display. Removed when delivered. */
private _steeringMessages: string[] = [];
/** Tracks pending follow-up messages for UI display. Removed when delivered. */
private _followUpMessages: string[] = [];
// Compaction state
private _compactionAbortController: AbortController | undefined = undefined;
@ -207,16 +209,21 @@ export class AgentSession {
/** Internal handler for agent events - shared by subscribe and reconnect */
private _handleAgentEvent = async (event: AgentEvent): Promise<void> => {
// When a user message starts, check if it's from the queue and remove it BEFORE emitting
// When a user message starts, check if it's from either queue and remove it BEFORE emitting
// This ensures the UI sees the updated queue state
if (event.type === "message_start" && event.message.role === "user" && this._queuedMessages.length > 0) {
// Extract text content from the message
if (event.type === "message_start" && event.message.role === "user") {
const messageText = this._getUserMessageText(event.message);
if (messageText && this._queuedMessages.includes(messageText)) {
// Remove the first occurrence of this message from the queue
const index = this._queuedMessages.indexOf(messageText);
if (index !== -1) {
this._queuedMessages.splice(index, 1);
if (messageText) {
// Check steering queue first
const steeringIndex = this._steeringMessages.indexOf(messageText);
if (steeringIndex !== -1) {
this._steeringMessages.splice(steeringIndex, 1);
} else {
// Check follow-up queue
const followUpIndex = this._followUpMessages.indexOf(messageText);
if (followUpIndex !== -1) {
this._followUpMessages.splice(followUpIndex, 1);
}
}
}
}
@ -418,9 +425,14 @@ export class AgentSession {
return this.agent.state.messages;
}
/** Current queue mode */
get queueMode(): "all" | "one-at-a-time" {
return this.agent.getQueueMode();
/** Current steering mode */
get steeringMode(): "all" | "one-at-a-time" {
return this.agent.getSteeringMode();
}
/** Current follow-up mode */
get followUpMode(): "all" | "one-at-a-time" {
return this.agent.getFollowUpMode();
}
/** Current session file path, or undefined if sessions are disabled */
@ -456,7 +468,7 @@ export class AgentSession {
*/
async prompt(text: string, options?: PromptOptions): Promise<void> {
if (this.isStreaming) {
throw new Error("Agent is already processing. Use queueMessage() to queue messages during streaming.");
throw new Error("Agent is already processing. Use steer() or followUp() to queue messages during streaming.");
}
// Flush any pending bash messages before the new prompt
@ -565,12 +577,25 @@ export class AgentSession {
}
/**
* Queue a message to be sent after the current response completes.
* Use when agent is currently streaming.
* Queue a steering message to interrupt the agent mid-run.
* Delivered after current tool execution, skips remaining tools.
*/
async queueMessage(text: string): Promise<void> {
this._queuedMessages.push(text);
await this.agent.queueMessage({
async steer(text: string): Promise<void> {
this._steeringMessages.push(text);
this.agent.steer({
role: "user",
content: [{ type: "text", text }],
timestamp: Date.now(),
});
}
/**
* Queue a follow-up message to be processed after the agent finishes.
* Delivered only when agent has no more tool calls or steering messages.
*/
async followUp(text: string): Promise<void> {
this._followUpMessages.push(text);
this.agent.followUp({
role: "user",
content: [{ type: "text", text }],
timestamp: Date.now(),
@ -586,11 +611,12 @@ export class AgentSession {
* - Not streaming + no trigger: appends to state/session, no turn
*
* @param message Hook message with customType, content, display, details
* @param triggerTurn If true and not streaming, triggers a new LLM turn
* @param options.triggerTurn If true and not streaming, triggers a new LLM turn
* @param options.deliverAs When streaming, use "steer" (default) for immediate or "followUp" to wait
*/
async sendHookMessage<T = unknown>(
message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">,
triggerTurn?: boolean,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" },
): Promise<void> {
const appMessage = {
role: "hookMessage" as const,
@ -602,8 +628,12 @@ export class AgentSession {
} satisfies HookMessage<T>;
if (this.isStreaming) {
// Queue for processing by agent loop
await this.agent.queueMessage(appMessage);
} else if (triggerTurn) {
if (options?.deliverAs === "followUp") {
this.agent.followUp(appMessage);
} else {
this.agent.steer(appMessage);
}
} else if (options?.triggerTurn) {
// Send as prompt - agent loop will emit message events
await this.agent.prompt(appMessage);
} else {
@ -619,24 +649,32 @@ export class AgentSession {
}
/**
* Clear queued messages and return them.
* Clear all queued messages and return them.
* Useful for restoring to editor when user aborts.
* @returns Object with steering and followUp arrays
*/
clearQueue(): string[] {
const queued = [...this._queuedMessages];
this._queuedMessages = [];
this.agent.clearMessageQueue();
return queued;
clearQueue(): { steering: string[]; followUp: string[] } {
const steering = [...this._steeringMessages];
const followUp = [...this._followUpMessages];
this._steeringMessages = [];
this._followUpMessages = [];
this.agent.clearAllQueues();
return { steering, followUp };
}
/** Number of messages currently queued */
get queuedMessageCount(): number {
return this._queuedMessages.length;
/** Number of pending messages (includes both steering and follow-up) */
get pendingMessageCount(): number {
return this._steeringMessages.length + this._followUpMessages.length;
}
/** Get queued messages (read-only) */
getQueuedMessages(): readonly string[] {
return this._queuedMessages;
/** Get pending steering messages (read-only) */
getSteeringMessages(): readonly string[] {
return this._steeringMessages;
}
/** Get pending follow-up messages (read-only) */
getFollowUpMessages(): readonly string[] {
return this._followUpMessages;
}
get skillsSettings(): Required<SkillsSettings> | undefined {
@ -678,7 +716,8 @@ export class AgentSession {
await this.abort();
this.agent.reset();
this.sessionManager.newSession(options);
this._queuedMessages = [];
this._steeringMessages = [];
this._followUpMessages = [];
this._reconnectToAgent();
// Emit session_switch event with reason "new" to hooks
@ -856,12 +895,21 @@ export class AgentSession {
// =========================================================================
/**
* Set message queue mode.
* Set steering message mode.
* Saves to settings.
*/
setQueueMode(mode: "all" | "one-at-a-time"): void {
this.agent.setQueueMode(mode);
this.settingsManager.setQueueMode(mode);
setSteeringMode(mode: "all" | "one-at-a-time"): void {
this.agent.setSteeringMode(mode);
this.settingsManager.setSteeringMode(mode);
}
/**
* Set follow-up message mode.
* Saves to settings.
*/
setFollowUpMode(mode: "all" | "one-at-a-time"): void {
this.agent.setFollowUpMode(mode);
this.settingsManager.setFollowUpMode(mode);
}
// =========================================================================
@ -1450,7 +1498,8 @@ export class AgentSession {
this._disconnectFromAgent();
await this.abort();
this._queuedMessages = [];
this._steeringMessages = [];
this._followUpMessages = [];
// Set new session
this.sessionManager.setSessionFile(sessionPath);
@ -1882,7 +1931,7 @@ export class AgentSession {
modelRegistry: this._modelRegistry,
model: this.agent.state.model,
isIdle: () => !this.isStreaming,
hasQueuedMessages: () => this.queuedMessageCount > 0,
hasPendingMessages: () => this.pendingMessageCount > 0,
abort: () => {
this.abort();
},

View file

@ -50,7 +50,7 @@ export interface CustomToolContext {
/** Whether the agent is idle (not streaming) */
isIdle(): boolean;
/** Whether there are queued messages waiting to be processed */
hasQueuedMessages(): boolean;
hasPendingMessages(): boolean;
/** Abort the current agent operation (fire-and-forget, does not wait) */
abort(): void;
}

View file

@ -53,7 +53,7 @@ type HandlerFn = (...args: unknown[]) => Promise<unknown>;
*/
export type SendMessageHandler = <T = unknown>(
message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">,
triggerTurn?: boolean,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" },
) => void;
/**
@ -177,8 +177,11 @@ function createHookAPI(
list.push(handler);
handlers.set(event, list);
},
sendMessage<T = unknown>(message: HookMessage<T>, triggerTurn?: boolean): void {
sendMessageHandler(message, triggerTurn);
sendMessage<T = unknown>(
message: HookMessage<T>,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" },
): void {
sendMessageHandler(message, options);
},
appendEntry<T = unknown>(customType: string, data?: T): void {
appendEntryHandler(customType, data);

View file

@ -73,7 +73,7 @@ export class HookRunner {
private isIdleFn: () => boolean = () => true;
private waitForIdleFn: () => Promise<void> = async () => {};
private abortFn: () => void = () => {};
private hasQueuedMessagesFn: () => boolean = () => false;
private hasPendingMessagesFn: () => boolean = () => false;
private newSessionHandler: NewSessionHandler = async () => ({ cancelled: false });
private branchHandler: BranchHandler = async () => ({ cancelled: false });
private navigateTreeHandler: NavigateTreeHandler = async () => ({ cancelled: false });
@ -111,7 +111,7 @@ export class HookRunner {
/** Function to abort current operation (fire-and-forget) */
abort?: () => void;
/** Function to check if there are queued messages */
hasQueuedMessages?: () => boolean;
hasPendingMessages?: () => boolean;
/** UI context for interactive prompts */
uiContext?: HookUIContext;
/** Whether UI is available */
@ -121,7 +121,7 @@ export class HookRunner {
this.isIdleFn = options.isIdle ?? (() => true);
this.waitForIdleFn = options.waitForIdle ?? (async () => {});
this.abortFn = options.abort ?? (() => {});
this.hasQueuedMessagesFn = options.hasQueuedMessages ?? (() => false);
this.hasPendingMessagesFn = options.hasPendingMessages ?? (() => false);
// Store session handlers for HookCommandContext
if (options.newSessionHandler) {
this.newSessionHandler = options.newSessionHandler;
@ -250,7 +250,7 @@ export class HookRunner {
model: this.getModel(),
isIdle: () => this.isIdleFn(),
abort: () => this.abortFn(),
hasQueuedMessages: () => this.hasQueuedMessagesFn(),
hasPendingMessages: () => this.hasPendingMessagesFn(),
};
}

View file

@ -161,7 +161,7 @@ export interface HookContext {
/** Abort the current agent operation (fire-and-forget, does not wait) */
abort(): void;
/** Whether there are queued messages waiting to be processed */
hasQueuedMessages(): boolean;
hasPendingMessages(): boolean;
}
/**
@ -692,12 +692,15 @@ export interface HookAPI {
* @param message.content - Message content (string or TextContent/ImageContent array)
* @param message.display - Whether to show in TUI (true = styled display, false = hidden)
* @param message.details - Optional hook-specific metadata (not sent to LLM)
* @param triggerTurn - If true and agent is idle, triggers a new LLM turn. Default: false.
* If agent is streaming, message is queued and triggerTurn is ignored.
* @param options.triggerTurn - If true and agent is idle, triggers a new LLM turn. Default: false.
* If agent is streaming, message is queued and triggerTurn is ignored.
* @param options.deliverAs - How to deliver when agent is streaming. Default: "steer".
* - "steer": Interrupt mid-run, delivered after current tool execution.
* - "followUp": Wait until agent finishes all work before delivery.
*/
sendMessage<T = unknown>(
message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">,
triggerTurn?: boolean,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" },
): void;
/**

View file

@ -305,7 +305,8 @@ export function loadSettings(cwd?: string, agentDir?: string): Settings {
defaultProvider: manager.getDefaultProvider(),
defaultModel: manager.getDefaultModel(),
defaultThinkingLevel: manager.getDefaultThinkingLevel(),
queueMode: manager.getQueueMode(),
steeringMode: manager.getSteeringMode(),
followUpMode: manager.getFollowUpMode(),
theme: manager.getTheme(),
compaction: manager.getCompactionSettings(),
retry: manager.getRetrySettings(),
@ -343,7 +344,10 @@ function createLoadedHooksFromDefinitions(definitions: Array<{ path?: string; fa
const handlers = new Map<string, Array<(...args: unknown[]) => Promise<unknown>>>();
const messageRenderers = new Map<string, any>();
const commands = new Map<string, any>();
let sendMessageHandler: (message: any, triggerTurn?: boolean) => void = () => {};
let sendMessageHandler: (
message: any,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" },
) => void = () => {};
let appendEntryHandler: (customType: string, data?: any) => void = () => {};
let newSessionHandler: (options?: any) => Promise<{ cancelled: boolean }> = async () => ({ cancelled: false });
let branchHandler: (entryId: string) => Promise<{ cancelled: boolean }> = async () => ({ cancelled: false });
@ -357,8 +361,8 @@ function createLoadedHooksFromDefinitions(definitions: Array<{ path?: string; fa
list.push(handler);
handlers.set(event, list);
},
sendMessage: (message: any, triggerTurn?: boolean) => {
sendMessageHandler(message, triggerTurn);
sendMessage: (message: any, options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" }) => {
sendMessageHandler(message, options);
},
appendEntry: (customType: string, data?: any) => {
appendEntryHandler(customType, data);
@ -382,7 +386,9 @@ function createLoadedHooksFromDefinitions(definitions: Array<{ path?: string; fa
handlers,
messageRenderers,
commands,
setSendMessageHandler: (handler: (message: any, triggerTurn?: boolean) => void) => {
setSendMessageHandler: (
handler: (message: any, options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" }) => void,
) => {
sendMessageHandler = handler;
},
setAppendEntryHandler: (handler: (customType: string, data?: any) => void) => {
@ -575,7 +581,7 @@ export async function createAgentSession(options: CreateAgentSessionOptions = {}
modelRegistry,
model: agent.state.model,
isIdle: () => !session.isStreaming,
hasQueuedMessages: () => session.queuedMessageCount > 0,
hasPendingMessages: () => session.pendingMessageCount > 0,
abort: () => {
session.abort();
},
@ -626,7 +632,8 @@ export async function createAgentSession(options: CreateAgentSessionOptions = {}
return hookRunner.emitContext(messages);
}
: undefined,
queueMode: settingsManager.getQueueMode(),
steeringMode: settingsManager.getSteeringMode(),
followUpMode: settingsManager.getFollowUpMode(),
getApiKey: async () => {
const currentModel = agent.state.model;
if (!currentModel) {

View file

@ -39,7 +39,8 @@ export interface Settings {
defaultProvider?: string;
defaultModel?: string;
defaultThinkingLevel?: "off" | "minimal" | "low" | "medium" | "high" | "xhigh";
queueMode?: "all" | "one-at-a-time";
steeringMode?: "all" | "one-at-a-time";
followUpMode?: "all" | "one-at-a-time";
theme?: string;
compaction?: CompactionSettings;
branchSummary?: BranchSummarySettings;
@ -125,13 +126,24 @@ export class SettingsManager {
}
try {
const content = readFileSync(path, "utf-8");
return JSON.parse(content);
const settings = JSON.parse(content);
return SettingsManager.migrateSettings(settings);
} catch (error) {
console.error(`Warning: Could not read settings file ${path}: ${error}`);
return {};
}
}
/** Migrate old settings format to new format */
private static migrateSettings(settings: Record<string, unknown>): Settings {
// Migrate queueMode -> steeringMode
if ("queueMode" in settings && !("steeringMode" in settings)) {
settings.steeringMode = settings.queueMode;
delete settings.queueMode;
}
return settings as Settings;
}
private loadProjectSettings(): Settings {
if (!this.projectSettingsPath || !existsSync(this.projectSettingsPath)) {
return {};
@ -139,7 +151,8 @@ export class SettingsManager {
try {
const content = readFileSync(this.projectSettingsPath, "utf-8");
return JSON.parse(content);
const settings = JSON.parse(content);
return SettingsManager.migrateSettings(settings);
} catch (error) {
console.error(`Warning: Could not read project settings file: ${error}`);
return {};
@ -204,12 +217,21 @@ export class SettingsManager {
this.save();
}
getQueueMode(): "all" | "one-at-a-time" {
return this.settings.queueMode || "one-at-a-time";
getSteeringMode(): "all" | "one-at-a-time" {
return this.settings.steeringMode || "one-at-a-time";
}
setQueueMode(mode: "all" | "one-at-a-time"): void {
this.globalSettings.queueMode = mode;
setSteeringMode(mode: "all" | "one-at-a-time"): void {
this.globalSettings.steeringMode = mode;
this.save();
}
getFollowUpMode(): "all" | "one-at-a-time" {
return this.settings.followUpMode || "one-at-a-time";
}
setFollowUpMode(mode: "all" | "one-at-a-time"): void {
this.globalSettings.followUpMode = mode;
this.save();
}

View file

@ -157,5 +157,5 @@ export {
export { main } from "./main.js";
// UI components for hooks
export { BorderedLoader } from "./modes/interactive/components/bordered-loader.js";
// Theme utilities for custom tools
export { getMarkdownTheme } from "./modes/interactive/theme/theme.js";
// Theme utilities for custom tools and hooks
export { getMarkdownTheme, Theme, type ThemeColor } from "./modes/interactive/theme/theme.js";

View file

@ -1,5 +1,6 @@
import {
Editor,
isAltEnter,
isCtrlC,
isCtrlD,
isCtrlG,
@ -28,8 +29,14 @@ export class CustomEditor extends Editor {
public onCtrlT?: () => void;
public onCtrlG?: () => void;
public onCtrlZ?: () => void;
public onAltEnter?: () => void;
handleInput(data: string): void {
// Intercept Alt+Enter for follow-up messages
if (isAltEnter(data) && this.onAltEnter) {
this.onAltEnter();
return;
}
// Intercept Ctrl+G for external editor
if (isCtrlG(data) && this.onCtrlG) {
this.onCtrlG();

View file

@ -1,56 +0,0 @@
import { Container, type SelectItem, SelectList } from "@mariozechner/pi-tui";
import { getSelectListTheme } from "../theme/theme.js";
import { DynamicBorder } from "./dynamic-border.js";
/**
* Component that renders a queue mode selector with borders
*/
export class QueueModeSelectorComponent extends Container {
private selectList: SelectList;
constructor(
currentMode: "all" | "one-at-a-time",
onSelect: (mode: "all" | "one-at-a-time") => void,
onCancel: () => void,
) {
super();
const queueModes: SelectItem[] = [
{
value: "one-at-a-time",
label: "one-at-a-time",
description: "Process queued messages one by one (recommended)",
},
{ value: "all", label: "all", description: "Process all queued messages at once" },
];
// Add top border
this.addChild(new DynamicBorder());
// Create selector
this.selectList = new SelectList(queueModes, 2, getSelectListTheme());
// Preselect current mode
const currentIndex = queueModes.findIndex((item) => item.value === currentMode);
if (currentIndex !== -1) {
this.selectList.setSelectedIndex(currentIndex);
}
this.selectList.onSelect = (item) => {
onSelect(item.value as "all" | "one-at-a-time");
};
this.selectList.onCancel = () => {
onCancel();
};
this.addChild(this.selectList);
// Add bottom border
this.addChild(new DynamicBorder());
}
getSelectList(): SelectList {
return this.selectList;
}
}

View file

@ -24,7 +24,8 @@ const THINKING_DESCRIPTIONS: Record<ThinkingLevel, string> = {
export interface SettingsConfig {
autoCompact: boolean;
showImages: boolean;
queueMode: "all" | "one-at-a-time";
steeringMode: "all" | "one-at-a-time";
followUpMode: "all" | "one-at-a-time";
thinkingLevel: ThinkingLevel;
availableThinkingLevels: ThinkingLevel[];
currentTheme: string;
@ -36,7 +37,8 @@ export interface SettingsConfig {
export interface SettingsCallbacks {
onAutoCompactChange: (enabled: boolean) => void;
onShowImagesChange: (enabled: boolean) => void;
onQueueModeChange: (mode: "all" | "one-at-a-time") => void;
onSteeringModeChange: (mode: "all" | "one-at-a-time") => void;
onFollowUpModeChange: (mode: "all" | "one-at-a-time") => void;
onThinkingLevelChange: (level: ThinkingLevel) => void;
onThemeChange: (theme: string) => void;
onThemePreview?: (theme: string) => void;
@ -127,10 +129,19 @@ export class SettingsSelectorComponent extends Container {
values: ["true", "false"],
},
{
id: "queue-mode",
label: "Queue mode",
description: "How to process queued messages while agent is working",
currentValue: config.queueMode,
id: "steering-mode",
label: "Steering mode",
description:
"Enter while streaming queues steering messages. 'one-at-a-time': deliver one, wait for response. 'all': deliver all at once.",
currentValue: config.steeringMode,
values: ["one-at-a-time", "all"],
},
{
id: "follow-up-mode",
label: "Follow-up mode",
description:
"Alt+Enter queues follow-up messages until agent stops. 'one-at-a-time': deliver one, wait for response. 'all': deliver all at once.",
currentValue: config.followUpMode,
values: ["one-at-a-time", "all"],
},
{
@ -227,8 +238,11 @@ export class SettingsSelectorComponent extends Container {
case "show-images":
callbacks.onShowImagesChange(newValue === "true");
break;
case "queue-mode":
callbacks.onQueueModeChange(newValue as "all" | "one-at-a-time");
case "steering-mode":
callbacks.onSteeringModeChange(newValue as "all" | "one-at-a-time");
break;
case "follow-up-mode":
callbacks.onFollowUpModeChange(newValue as "all" | "one-at-a-time");
break;
case "hide-thinking":
callbacks.onHideThinkingBlockChange(newValue === "true");

View file

@ -262,6 +262,9 @@ export class InteractiveMode {
theme.fg("dim", "!") +
theme.fg("muted", " to run bash") +
"\n" +
theme.fg("dim", "alt+enter") +
theme.fg("muted", " to queue follow-up") +
"\n" +
theme.fg("dim", "drop files") +
theme.fg("muted", " to attach");
const header = new Text(`${logo}\n${instructions}`, 1, 0);
@ -401,10 +404,10 @@ export class InteractiveMode {
hookRunner.initialize({
getModel: () => this.session.model,
sendMessageHandler: (message, triggerTurn) => {
sendMessageHandler: (message, options) => {
const wasStreaming = this.session.isStreaming;
this.session
.sendHookMessage(message, triggerTurn)
.sendHookMessage(message, options)
.then(() => {
// For non-streaming cases with display=true, update UI
// (streaming cases update via message_end event)
@ -486,7 +489,7 @@ export class InteractiveMode {
abort: () => {
this.session.abort();
},
hasQueuedMessages: () => this.session.queuedMessageCount > 0,
hasPendingMessages: () => this.session.pendingMessageCount > 0,
uiContext,
hasUI: true,
});
@ -522,7 +525,7 @@ export class InteractiveMode {
modelRegistry: this.session.modelRegistry,
model: this.session.model,
isIdle: () => !this.session.isStreaming,
hasQueuedMessages: () => this.session.queuedMessageCount > 0,
hasPendingMessages: () => this.session.pendingMessageCount > 0,
abort: () => {
this.session.abort();
},
@ -737,8 +740,9 @@ export class InteractiveMode {
this.editor.onEscape = () => {
if (this.loadingAnimation) {
// Abort and restore queued messages to editor
const queuedMessages = this.session.clearQueue();
const queuedText = queuedMessages.join("\n\n");
const { steering, followUp } = this.session.clearQueue();
const allQueued = [...steering, ...followUp];
const queuedText = allQueued.join("\n\n");
const currentText = this.editor.getText();
const combinedText = [queuedText, currentText].filter((t) => t.trim()).join("\n\n");
this.editor.setText(combinedText);
@ -775,6 +779,7 @@ export class InteractiveMode {
this.editor.onCtrlO = () => this.toggleToolOutputExpansion();
this.editor.onCtrlT = () => this.toggleThinkingBlockVisibility();
this.editor.onCtrlG = () => this.openExternalEditor();
this.editor.onAltEnter = () => this.handleAltEnter();
this.editor.onChange = (text: string) => {
const wasBashMode = this.isBashMode;
@ -919,9 +924,9 @@ export class InteractiveMode {
}
}
// Queue regular messages if agent is streaming
// Queue steering message if agent is streaming (interrupts current work)
if (this.session.isStreaming) {
await this.session.queueMessage(text);
await this.session.steer(text);
this.updatePendingMessagesDisplay();
this.editor.addToHistory(text);
this.editor.setText("");
@ -1446,6 +1451,24 @@ export class InteractiveMode {
process.kill(0, "SIGTSTP");
}
private async handleAltEnter(): Promise<void> {
const text = this.editor.getText().trim();
if (!text) return;
// Alt+Enter queues a follow-up message (waits until agent finishes)
if (this.session.isStreaming) {
await this.session.followUp(text);
this.updatePendingMessagesDisplay();
this.editor.addToHistory(text);
this.editor.setText("");
this.ui.requestRender();
}
// If not streaming, Alt+Enter acts like regular Enter (trigger onSubmit)
else if (this.editor.onSubmit) {
this.editor.onSubmit(text);
}
}
private updateEditorBorderColor(): void {
if (this.isBashMode) {
this.editor.borderColor = theme.getBashModeBorderColor();
@ -1599,12 +1622,17 @@ export class InteractiveMode {
private updatePendingMessagesDisplay(): void {
this.pendingMessagesContainer.clear();
const queuedMessages = this.session.getQueuedMessages();
if (queuedMessages.length > 0) {
const steeringMessages = this.session.getSteeringMessages();
const followUpMessages = this.session.getFollowUpMessages();
if (steeringMessages.length > 0 || followUpMessages.length > 0) {
this.pendingMessagesContainer.addChild(new Spacer(1));
for (const message of queuedMessages) {
const queuedText = theme.fg("dim", `Queued: ${message}`);
this.pendingMessagesContainer.addChild(new TruncatedText(queuedText, 1, 0));
for (const message of steeringMessages) {
const text = theme.fg("dim", `Steering: ${message}`);
this.pendingMessagesContainer.addChild(new TruncatedText(text, 1, 0));
}
for (const message of followUpMessages) {
const text = theme.fg("dim", `Follow-up: ${message}`);
this.pendingMessagesContainer.addChild(new TruncatedText(text, 1, 0));
}
}
}
@ -1645,7 +1673,8 @@ export class InteractiveMode {
{
autoCompact: this.session.autoCompactionEnabled,
showImages: this.settingsManager.getShowImages(),
queueMode: this.session.queueMode,
steeringMode: this.session.steeringMode,
followUpMode: this.session.followUpMode,
thinkingLevel: this.session.thinkingLevel,
availableThinkingLevels: this.session.getAvailableThinkingLevels(),
currentTheme: this.settingsManager.getTheme() || "dark",
@ -1666,8 +1695,11 @@ export class InteractiveMode {
}
}
},
onQueueModeChange: (mode) => {
this.session.setQueueMode(mode);
onSteeringModeChange: (mode) => {
this.session.setSteeringMode(mode);
},
onFollowUpModeChange: (mode) => {
this.session.setFollowUpMode(mode);
},
onThinkingLevelChange: (level) => {
this.session.setThinkingLevel(level);

View file

@ -32,8 +32,8 @@ export async function runPrintMode(
if (hookRunner) {
hookRunner.initialize({
getModel: () => session.model,
sendMessageHandler: (message, triggerTurn) => {
session.sendHookMessage(message, triggerTurn).catch((e) => {
sendMessageHandler: (message, options) => {
session.sendHookMessage(message, options).catch((e) => {
console.error(`Hook sendMessage failed: ${e instanceof Error ? e.message : String(e)}`);
});
},
@ -64,7 +64,7 @@ export async function runPrintMode(
modelRegistry: session.modelRegistry,
model: session.model,
isIdle: () => !session.isStreaming,
hasQueuedMessages: () => session.queuedMessageCount > 0,
hasPendingMessages: () => session.pendingMessageCount > 0,
abort: () => {
session.abort();
},

View file

@ -173,10 +173,17 @@ export class RpcClient {
}
/**
* Queue a message while agent is streaming.
* Queue a steering message to interrupt the agent mid-run.
*/
async queueMessage(message: string): Promise<void> {
await this.send({ type: "queue_message", message });
async steer(message: string): Promise<void> {
await this.send({ type: "steer", message });
}
/**
* Queue a follow-up message to be processed after the agent finishes.
*/
async followUp(message: string): Promise<void> {
await this.send({ type: "follow_up", message });
}
/**
@ -248,10 +255,17 @@ export class RpcClient {
}
/**
* Set queue mode.
* Set steering mode.
*/
async setQueueMode(mode: "all" | "one-at-a-time"): Promise<void> {
await this.send({ type: "set_queue_mode", mode });
async setSteeringMode(mode: "all" | "one-at-a-time"): Promise<void> {
await this.send({ type: "set_steering_mode", mode });
}
/**
* Set follow-up mode.
*/
async setFollowUpMode(mode: "all" | "one-at-a-time"): Promise<void> {
await this.send({ type: "set_follow_up_mode", mode });
}
/**

View file

@ -181,8 +181,8 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
if (hookRunner) {
hookRunner.initialize({
getModel: () => session.agent.state.model,
sendMessageHandler: (message, triggerTurn) => {
session.sendHookMessage(message, triggerTurn).catch((e) => {
sendMessageHandler: (message, options) => {
session.sendHookMessage(message, options).catch((e) => {
output(error(undefined, "hook_send", e.message));
});
},
@ -216,7 +216,7 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
modelRegistry: session.modelRegistry,
model: session.model,
isIdle: () => !session.isStreaming,
hasQueuedMessages: () => session.queuedMessageCount > 0,
hasPendingMessages: () => session.pendingMessageCount > 0,
abort: () => {
session.abort();
},
@ -253,9 +253,14 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
return success(id, "prompt");
}
case "queue_message": {
await session.queueMessage(command.message);
return success(id, "queue_message");
case "steer": {
await session.steer(command.message);
return success(id, "steer");
}
case "follow_up": {
await session.followUp(command.message);
return success(id, "follow_up");
}
case "abort": {
@ -279,12 +284,13 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
thinkingLevel: session.thinkingLevel,
isStreaming: session.isStreaming,
isCompacting: session.isCompacting,
queueMode: session.queueMode,
steeringMode: session.steeringMode,
followUpMode: session.followUpMode,
sessionFile: session.sessionFile,
sessionId: session.sessionId,
autoCompactionEnabled: session.autoCompactionEnabled,
messageCount: session.messages.length,
queuedMessageCount: session.queuedMessageCount,
pendingMessageCount: session.pendingMessageCount,
};
return success(id, "get_state", state);
}
@ -334,12 +340,17 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
}
// =================================================================
// Queue Mode
// Queue Modes
// =================================================================
case "set_queue_mode": {
session.setQueueMode(command.mode);
return success(id, "set_queue_mode");
case "set_steering_mode": {
session.setSteeringMode(command.mode);
return success(id, "set_steering_mode");
}
case "set_follow_up_mode": {
session.setFollowUpMode(command.mode);
return success(id, "set_follow_up_mode");
}
// =================================================================

View file

@ -18,7 +18,8 @@ import type { CompactionResult } from "../../core/compaction/index.js";
export type RpcCommand =
// Prompting
| { id?: string; type: "prompt"; message: string; images?: ImageContent[] }
| { id?: string; type: "queue_message"; message: string }
| { id?: string; type: "steer"; message: string }
| { id?: string; type: "follow_up"; message: string }
| { id?: string; type: "abort" }
| { id?: string; type: "new_session"; parentSession?: string }
@ -34,8 +35,9 @@ export type RpcCommand =
| { id?: string; type: "set_thinking_level"; level: ThinkingLevel }
| { id?: string; type: "cycle_thinking_level" }
// Queue mode
| { id?: string; type: "set_queue_mode"; mode: "all" | "one-at-a-time" }
// Queue modes
| { id?: string; type: "set_steering_mode"; mode: "all" | "one-at-a-time" }
| { id?: string; type: "set_follow_up_mode"; mode: "all" | "one-at-a-time" }
// Compaction
| { id?: string; type: "compact"; customInstructions?: string }
@ -69,12 +71,13 @@ export interface RpcSessionState {
thinkingLevel: ThinkingLevel;
isStreaming: boolean;
isCompacting: boolean;
queueMode: "all" | "one-at-a-time";
steeringMode: "all" | "one-at-a-time";
followUpMode: "all" | "one-at-a-time";
sessionFile?: string;
sessionId: string;
autoCompactionEnabled: boolean;
messageCount: number;
queuedMessageCount: number;
pendingMessageCount: number;
}
// ============================================================================
@ -85,7 +88,8 @@ export interface RpcSessionState {
export type RpcResponse =
// Prompting (async - events follow)
| { id?: string; type: "response"; command: "prompt"; success: true }
| { id?: string; type: "response"; command: "queue_message"; success: true }
| { id?: string; type: "response"; command: "steer"; success: true }
| { id?: string; type: "response"; command: "follow_up"; success: true }
| { id?: string; type: "response"; command: "abort"; success: true }
| { id?: string; type: "response"; command: "new_session"; success: true; data: { cancelled: boolean } }
@ -125,8 +129,9 @@ export type RpcResponse =
data: { level: ThinkingLevel } | null;
}
// Queue mode
| { id?: string; type: "response"; command: "set_queue_mode"; success: true }
// Queue modes
| { id?: string; type: "response"; command: "set_steering_mode"; success: true }
| { id?: string; type: "response"; command: "set_follow_up_mode"; success: true }
// Compaction
| { id?: string; type: "response"; command: "compact"; success: true; data: CompactionResult }

View file

@ -127,7 +127,7 @@ describe("AgentSession concurrent prompt guard", () => {
// Second prompt should reject
await expect(session.prompt("Second message")).rejects.toThrow(
"Agent is already processing. Use queueMessage() to queue messages during streaming.",
"Agent is already processing. Use steer() or followUp() to queue messages during streaming.",
);
// Cleanup
@ -135,16 +135,32 @@ describe("AgentSession concurrent prompt guard", () => {
await firstPrompt.catch(() => {}); // Ignore abort error
});
it("should allow queueMessage() while streaming", async () => {
it("should allow steer() while streaming", async () => {
createSession();
// Start first prompt
const firstPrompt = session.prompt("First message");
await new Promise((resolve) => setTimeout(resolve, 10));
// queueMessage should work while streaming
expect(() => session.queueMessage("Queued message")).not.toThrow();
expect(session.queuedMessageCount).toBe(1);
// steer should work while streaming
expect(() => session.steer("Steering message")).not.toThrow();
expect(session.pendingMessageCount).toBe(1);
// Cleanup
await session.abort();
await firstPrompt.catch(() => {});
});
it("should allow followUp() while streaming", async () => {
createSession();
// Start first prompt
const firstPrompt = session.prompt("First message");
await new Promise((resolve) => setTimeout(resolve, 10));
// followUp should work while streaming
expect(() => session.followUp("Follow-up message")).not.toThrow();
expect(session.pendingMessageCount).toBe(1);
// Cleanup
await session.abort();

View file

@ -495,11 +495,19 @@ export class MomSettingsManager {
}
// Compatibility methods for AgentSession
getQueueMode(): "all" | "one-at-a-time" {
getSteeringMode(): "all" | "one-at-a-time" {
return "one-at-a-time"; // Mom processes one message at a time
}
setQueueMode(_mode: "all" | "one-at-a-time"): void {
setSteeringMode(_mode: "all" | "one-at-a-time"): void {
// No-op for mom
}
getFollowUpMode(): "all" | "one-at-a-time" {
return "one-at-a-time"; // Mom processes one message at a time
}
setFollowUpMode(_mode: "all" | "one-at-a-time"): void {
// No-op for mom
}

View file

@ -346,7 +346,7 @@ const renderApp = () => {
onClick: () => {
// Demo: Inject custom message (will appear on next agent run)
if (agent) {
agent.queueMessage(
agent.steer(
createSystemNotification(
"This is a custom message! It appears in the UI but is never sent to the LLM.",
),