Fix slash commands and hook commands during streaming

- Hook commands now execute immediately during streaming (they manage their own LLM interaction via pi.sendMessage())
- File-based slash commands are expanded and queued via steer/followUp during streaming
- prompt() accepts new streamingBehavior option ('steer' or 'followUp') for explicit queueing during streaming
- steer() and followUp() now expand file-based slash commands and error on hook commands
- RPC prompt command accepts optional streamingBehavior field
- Updated docs: rpc.md, sdk.md, CHANGELOG.md

fixes #420
This commit is contained in:
Mario Zechner 2026-01-03 15:36:54 +01:00
parent 308c0e0ec0
commit e9cf3c1835
7 changed files with 207 additions and 52 deletions

View file

@ -6,6 +6,16 @@
- `$ARGUMENTS` syntax for custom slash commands as alternative to `$@` for all arguments joined. Aligns with patterns used by Claude, Codex, and OpenCode. Both syntaxes remain fully supported. ([#418](https://github.com/badlogic/pi-mono/pull/418) by [@skuridin](https://github.com/skuridin))
### Changed
- **Slash commands and hook commands now work during streaming**: Previously, using a slash command or hook command while the agent was streaming would crash with "Agent is already processing". Now:
- Hook commands execute immediately (they manage their own LLM interaction via `pi.sendMessage()`)
- File-based slash commands are expanded and queued via steer/followUp
- `steer()` and `followUp()` now expand file-based slash commands and error on hook commands (hook commands cannot be queued)
- `prompt()` accepts new `streamingBehavior` option (`"steer"` or `"followUp"`) to specify queueing behavior during streaming
- RPC `prompt` command now accepts optional `streamingBehavior` field
([#420](https://github.com/badlogic/pi-mono/issues/420))
### Fixed
- Slash command argument substitution now processes positional arguments (`$1`, `$2`, etc.) before all-arguments (`$@`, `$ARGUMENTS`) to prevent recursive substitution when argument values contain dollar-digit patterns like `$100`. ([#418](https://github.com/badlogic/pi-mono/pull/418) by [@skuridin](https://github.com/skuridin))

View file

@ -41,6 +41,21 @@ With images:
{"type": "prompt", "message": "What's in this image?", "images": [{"type": "image", "source": {"type": "base64", "mediaType": "image/png", "data": "..."}}]}
```
**During streaming**: If the agent is already streaming, you must specify `streamingBehavior` to queue the message:
```json
{"type": "prompt", "message": "New instruction", "streamingBehavior": "steer"}
```
- `"steer"`: Interrupt the agent mid-run. Message is delivered after current tool execution, remaining tools are skipped.
- `"followUp"`: Wait until the agent finishes. Message is delivered only when agent stops.
If the agent is streaming and no `streamingBehavior` is specified, the command returns an error.
**Hook commands**: If the message is a hook command (e.g., `/mycommand`), it executes immediately even during streaming. Hook commands manage their own LLM interaction via `pi.sendMessage()`.
**Slash commands**: File-based slash commands (from `.md` files) are expanded before sending/queueing.
Response:
```json
{"id": "req-1", "type": "response", "command": "prompt", "success": true}
@ -48,20 +63,35 @@ Response:
The `images` field is optional. Each image uses `ImageContent` format with base64 or URL source.
#### queue_message
#### steer
Queue a message to be injected at the next agent turn. Queued messages are added to the conversation without triggering a new prompt. Useful for injecting context mid-conversation.
Queue a steering message to interrupt the agent mid-run. Delivered after current tool execution, remaining tools are skipped. File-based slash commands are expanded. Hook commands are not allowed (use `prompt` instead).
```json
{"type": "queue_message", "message": "Additional context"}
{"type": "steer", "message": "Stop and do this instead"}
```
Response:
```json
{"type": "response", "command": "queue_message", "success": true}
{"type": "response", "command": "steer", "success": true}
```
See [set_queue_mode](#set_queue_mode) for controlling how queued messages are processed.
See [set_steering_mode](#set_steering_mode) for controlling how steering messages are processed.
#### follow_up
Queue a follow-up message to be processed after the agent finishes. Delivered only when agent has no more tool calls or steering messages. File-based slash commands are expanded. Hook commands are not allowed (use `prompt` instead).
```json
{"type": "follow_up", "message": "After you're done, also do this"}
```
Response:
```json
{"type": "response", "command": "follow_up", "success": true}
```
See [set_follow_up_mode](#set_follow_up_mode) for controlling how follow-up messages are processed.
#### abort
@ -120,12 +150,13 @@ Response:
"thinkingLevel": "medium",
"isStreaming": false,
"isCompacting": false,
"queueMode": "all",
"steeringMode": "all",
"followUpMode": "one-at-a-time",
"sessionFile": "/path/to/session.jsonl",
"sessionId": "abc123",
"autoCompactionEnabled": true,
"messageCount": 5,
"queuedMessageCount": 0
"pendingMessageCount": 0
}
}
```
@ -253,23 +284,40 @@ Response:
}
```
### Queue Mode
### Queue Modes
#### set_queue_mode
#### set_steering_mode
Control how queued messages (from `queue_message`) are injected into the conversation.
Control how steering messages (from `steer`) are delivered.
```json
{"type": "set_queue_mode", "mode": "one-at-a-time"}
{"type": "set_steering_mode", "mode": "one-at-a-time"}
```
Modes:
- `"all"`: Inject all queued messages at the next turn
- `"one-at-a-time"`: Inject one queued message per turn (default)
- `"all"`: Deliver all steering messages at the next interruption point
- `"one-at-a-time"`: Deliver one steering message per interruption (default)
Response:
```json
{"type": "response", "command": "set_queue_mode", "success": true}
{"type": "response", "command": "set_steering_mode", "success": true}
```
#### set_follow_up_mode
Control how follow-up messages (from `follow_up`) are delivered.
```json
{"type": "set_follow_up_mode", "mode": "one-at-a-time"}
```
Modes:
- `"all"`: Deliver all follow-up messages when agent finishes
- `"one-at-a-time"`: Deliver one follow-up message per agent completion (default)
Response:
```json
{"type": "response", "command": "set_follow_up_mode", "success": true}
```
### Compaction

View file

@ -77,8 +77,13 @@ The session manages the agent lifecycle, message history, and event streaming.
```typescript
interface AgentSession {
// Send a prompt and wait for completion
// If streaming, requires streamingBehavior option to queue the message
prompt(text: string, options?: PromptOptions): Promise<void>;
// Queue messages during streaming
steer(text: string): Promise<void>; // Interrupt: delivered after current tool, skips remaining
followUp(text: string): Promise<void>; // Wait: delivered only when agent finishes
// Subscribe to events (returns unsubscribe function)
subscribe(listener: (event: AgentSessionEvent) => void): () => void;
@ -122,6 +127,41 @@ interface AgentSession {
}
```
### Prompting and Message Queueing
The `prompt()` method handles slash commands, hook commands, and message sending:
```typescript
// Basic prompt (when not streaming)
await session.prompt("What files are here?");
// With images
await session.prompt("What's in this image?", {
images: [{ type: "image", source: { type: "base64", mediaType: "image/png", data: "..." } }]
});
// During streaming: must specify how to queue the message
await session.prompt("Stop and do this instead", { streamingBehavior: "steer" });
await session.prompt("After you're done, also check X", { streamingBehavior: "followUp" });
```
**Behavior:**
- **Hook commands** (e.g., `/mycommand`): Execute immediately, even during streaming. They manage their own LLM interaction via `pi.sendMessage()`.
- **File-based slash commands** (from `.md` files): Expanded to their content before sending/queueing.
- **During streaming without `streamingBehavior`**: Throws an error. Use `steer()` or `followUp()` directly, or specify the option.
For explicit queueing during streaming:
```typescript
// Interrupt the agent (delivered after current tool, skips remaining tools)
await session.steer("New instruction");
// Wait for agent to finish (delivered only when agent stops)
await session.followUp("After you're done, also do this");
```
Both `steer()` and `followUp()` expand file-based slash commands but error on hook commands (hook commands cannot be queued).
### Agent and AgentState
The `Agent` class (from `@mariozechner/pi-agent-core`) handles the core LLM interaction. Access it via `session.agent`.

View file

@ -83,6 +83,8 @@ export interface PromptOptions {
expandSlashCommands?: boolean;
/** Image attachments */
images?: ImageContent[];
/** When streaming, how to queue the message: "steer" (interrupt) or "followUp" (wait). Required if streaming. */
streamingBehavior?: "steer" | "followUp";
}
/** Result from cycleModel() */
@ -461,22 +463,18 @@ export class AgentSession {
/**
* Send a prompt to the agent.
* - Validates model and API key before sending
* - Handles hook commands (registered via pi.registerCommand)
* - Handles hook commands (registered via pi.registerCommand) immediately, even during streaming
* - Expands file-based slash commands by default
* @throws Error if no model selected or no API key available
* - During streaming, queues via steer() or followUp() based on streamingBehavior option
* - Validates model and API key before sending (when not streaming)
* @throws Error if streaming and no streamingBehavior specified
* @throws Error if no model selected or no API key available (when not streaming)
*/
async prompt(text: string, options?: PromptOptions): Promise<void> {
if (this.isStreaming) {
throw new Error("Agent is already processing. Use steer() or followUp() to queue messages during streaming.");
}
// Flush any pending bash messages before the new prompt
this._flushPendingBashMessages();
const expandCommands = options?.expandSlashCommands ?? true;
// Handle hook commands first (if enabled and text is a slash command)
// Handle hook commands first (execute immediately, even during streaming)
// Hook commands manage their own LLM interaction via pi.sendMessage()
if (expandCommands && text.startsWith("/")) {
const handled = await this._tryExecuteHookCommand(text);
if (handled) {
@ -485,6 +483,27 @@ export class AgentSession {
}
}
// Expand file-based slash commands if requested
const expandedText = expandCommands ? expandSlashCommand(text, [...this._fileCommands]) : text;
// If streaming, queue via steer() or followUp() based on option
if (this.isStreaming) {
if (!options?.streamingBehavior) {
throw new Error(
"Agent is already processing. Specify streamingBehavior ('steer' or 'followUp') to queue the message.",
);
}
if (options.streamingBehavior === "followUp") {
await this._queueFollowUp(expandedText);
} else {
await this._queueSteer(expandedText);
}
return;
}
// Flush any pending bash messages before the new prompt
this._flushPendingBashMessages();
// Validate model
if (!this.model) {
throw new Error(
@ -509,9 +528,6 @@ export class AgentSession {
await this._checkCompaction(lastAssistant, false);
}
// Expand file-based slash commands if requested
const expandedText = expandCommands ? expandSlashCommand(text, [...this._fileCommands]) : text;
// Build messages array (hook message if any, then user message)
const messages: AgentMessage[] = [];
@ -579,8 +595,43 @@ export class AgentSession {
/**
* Queue a steering message to interrupt the agent mid-run.
* Delivered after current tool execution, skips remaining tools.
* Expands file-based slash commands. Errors on hook commands.
* @throws Error if text is a hook command
*/
async steer(text: string): Promise<void> {
// Check for hook commands (cannot be queued)
if (text.startsWith("/")) {
this._throwIfHookCommand(text);
}
// Expand file-based slash commands
const expandedText = expandSlashCommand(text, [...this._fileCommands]);
await this._queueSteer(expandedText);
}
/**
* Queue a follow-up message to be processed after the agent finishes.
* Delivered only when agent has no more tool calls or steering messages.
* Expands file-based slash commands. Errors on hook commands.
* @throws Error if text is a hook command
*/
async followUp(text: string): Promise<void> {
// Check for hook commands (cannot be queued)
if (text.startsWith("/")) {
this._throwIfHookCommand(text);
}
// Expand file-based slash commands
const expandedText = expandSlashCommand(text, [...this._fileCommands]);
await this._queueFollowUp(expandedText);
}
/**
* Internal: Queue a steering message (already expanded, no hook command check).
*/
private async _queueSteer(text: string): Promise<void> {
this._steeringMessages.push(text);
this.agent.steer({
role: "user",
@ -590,10 +641,9 @@ export class AgentSession {
}
/**
* Queue a follow-up message to be processed after the agent finishes.
* Delivered only when agent has no more tool calls or steering messages.
* Internal: Queue a follow-up message (already expanded, no hook command check).
*/
async followUp(text: string): Promise<void> {
private async _queueFollowUp(text: string): Promise<void> {
this._followUpMessages.push(text);
this.agent.followUp({
role: "user",
@ -602,6 +652,23 @@ export class AgentSession {
});
}
/**
* Throw an error if the text is a hook command.
*/
private _throwIfHookCommand(text: string): void {
if (!this._hookRunner) return;
const spaceIndex = text.indexOf(" ");
const commandName = spaceIndex === -1 ? text.slice(1) : text.slice(1, spaceIndex);
const command = this._hookRunner.getCommand(commandName);
if (command) {
throw new Error(
`Hook command "/${commandName}" cannot be queued. Use prompt() or execute the command when not streaming.`,
);
}
}
/**
* Send a hook message to the session. Creates a CustomMessageEntry.
*

View file

@ -915,26 +915,13 @@ export class InteractiveMode {
return;
}
// Hook commands always run immediately, even during streaming
// (if they need to interact with LLM, they use pi.sendMessage which handles queueing)
if (text.startsWith("/") && this.session.hookRunner) {
const spaceIndex = text.indexOf(" ");
const commandName = spaceIndex === -1 ? text.slice(1) : text.slice(1, spaceIndex);
const command = this.session.hookRunner.getCommand(commandName);
if (command) {
this.editor.addToHistory(text);
this.editor.setText("");
await this.session.prompt(text);
return;
}
}
// Queue steering message if agent is streaming (interrupts current work)
// If streaming, use prompt() with steer behavior
// This handles hook commands (execute immediately), slash command expansion, and queueing
if (this.session.isStreaming) {
await this.session.steer(text);
this.updatePendingMessagesDisplay();
this.editor.addToHistory(text);
this.editor.setText("");
await this.session.prompt(text, { streamingBehavior: "steer" });
this.updatePendingMessagesDisplay();
this.ui.requestRender();
return;
}
@ -1461,11 +1448,12 @@ export class InteractiveMode {
if (!text) return;
// Alt+Enter queues a follow-up message (waits until agent finishes)
// This handles hook commands (execute immediately), slash command expansion, and queueing
if (this.session.isStreaming) {
await this.session.followUp(text);
this.updatePendingMessagesDisplay();
this.editor.addToHistory(text);
this.editor.setText("");
await this.session.prompt(text, { streamingBehavior: "followUp" });
this.updatePendingMessagesDisplay();
this.ui.requestRender();
}
// If not streaming, Alt+Enter acts like regular Enter (trigger onSubmit)

View file

@ -244,10 +244,12 @@ export async function runRpcMode(session: AgentSession): Promise<never> {
case "prompt": {
// Don't await - events will stream
// Hook commands and file slash commands are handled in session.prompt()
// Hook commands are executed immediately, file slash commands are expanded
// If streaming and streamingBehavior specified, queues via steer/followUp
session
.prompt(command.message, {
images: command.images,
streamingBehavior: command.streamingBehavior,
})
.catch((e) => output(error(id, "prompt", e.message)));
return success(id, "prompt");

View file

@ -17,7 +17,7 @@ import type { CompactionResult } from "../../core/compaction/index.js";
export type RpcCommand =
// Prompting
| { id?: string; type: "prompt"; message: string; images?: ImageContent[] }
| { id?: string; type: "prompt"; message: string; images?: ImageContent[]; streamingBehavior?: "steer" | "followUp" }
| { id?: string; type: "steer"; message: string }
| { id?: string; type: "follow_up"; message: string }
| { id?: string; type: "abort" }