feat(coding-agent): add event bus for tool/hook communication (#431)

* feat(coding-agent): add event bus for tool/hook communication

Adds pi.events API enabling custom tools and hooks to communicate via
pub/sub. Tools can emit events, hooks can listen. Shared EventBus instance
created per session in createAgentSession().

- EventBus interface with emit() and on() methods
- on() returns unsubscribe function
- Threaded through hook and tool loaders
- Documented in hooks.md and custom-tools.md

* fix(coding-agent): wrap event handlers to catch errors

* docs: note async handler error handling for event bus

* feat(coding-agent): add sendMessage to tools, nextTurn delivery mode

- Custom tools now have pi.sendMessage() for direct agent notifications
- New deliverAs: 'nextTurn' queues messages for next user prompt
- Fix: hooks and tools now share the same eventBus (was isolated before)

* fix(coding-agent): nextTurn delivery should always queue, even when streaming
This commit is contained in:
Nico Bailon 2026-01-04 12:36:19 -08:00 committed by GitHub
parent 12805f61bd
commit 9c9e6822e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 293 additions and 33 deletions

View file

@ -155,6 +155,8 @@ export class AgentSession {
private _steeringMessages: string[] = [];
/** Tracks pending follow-up messages for UI display. Removed when delivered. */
private _followUpMessages: string[] = [];
/** Messages queued to be included with the next user prompt as context ("asides"). */
private _pendingNextTurnMessages: HookMessage[] = [];
// Compaction state
private _compactionAbortController: AbortController | undefined = undefined;
@ -605,6 +607,12 @@ export class AgentSession {
timestamp: Date.now(),
});
// Inject any pending "nextTurn" messages as context alongside the user message
for (const msg of this._pendingNextTurnMessages) {
messages.push(msg);
}
this._pendingNextTurnMessages = [];
// Emit before_agent_start hook event
if (this._hookRunner) {
const result = await this._hookRunner.emitBeforeAgentStart(expandedText, options?.images);
@ -752,11 +760,11 @@ export class AgentSession {
*
* @param message Hook message with customType, content, display, details
* @param options.triggerTurn If true and not streaming, triggers a new LLM turn
* @param options.deliverAs When streaming, use "steer" (default) for immediate or "followUp" to wait
* @param options.deliverAs Delivery mode: "steer", "followUp", or "nextTurn"
*/
async sendHookMessage<T = unknown>(
message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" },
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" | "nextTurn" },
): Promise<void> {
const appMessage = {
role: "hookMessage" as const,
@ -766,18 +774,17 @@ export class AgentSession {
details: message.details,
timestamp: Date.now(),
} satisfies HookMessage<T>;
if (this.isStreaming) {
// Queue for processing by agent loop
if (options?.deliverAs === "nextTurn") {
this._pendingNextTurnMessages.push(appMessage);
} else if (this.isStreaming) {
if (options?.deliverAs === "followUp") {
this.agent.followUp(appMessage);
} else {
this.agent.steer(appMessage);
}
} else if (options?.triggerTurn) {
// Send as prompt - agent loop will emit message events
await this.agent.prompt(appMessage);
} else {
// Just append to agent state and session, no turn
this.agent.appendMessage(appMessage);
this.sessionManager.appendCustomMessageEntry(
message.customType,

View file

@ -15,6 +15,7 @@ import { fileURLToPath } from "node:url";
import { createJiti } from "jiti";
import { getAgentDir, isBunBinary } from "../../config.js";
import { theme } from "../../modes/interactive/theme/theme.js";
import { createEventBus, type EventBus } from "../event-bus.js";
import type { ExecOptions } from "../exec.js";
import { execCommand } from "../exec.js";
import type { HookUIContext } from "../hooks/types.js";
@ -213,10 +214,12 @@ export async function loadCustomTools(
paths: string[],
cwd: string,
builtInToolNames: string[],
eventBus?: EventBus,
): Promise<CustomToolsLoadResult> {
const tools: LoadedCustomTool[] = [];
const errors: Array<{ path: string; error: string }> = [];
const seenNames = new Set<string>(builtInToolNames);
const resolvedEventBus = eventBus ?? createEventBus();
// Shared API object - all tools get the same instance
const sharedApi: CustomToolAPI = {
@ -225,6 +228,8 @@ export async function loadCustomTools(
execCommand(command, args, options?.cwd ?? cwd, options),
ui: createNoOpUIContext(),
hasUI: false,
events: resolvedEventBus,
sendMessage: () => {},
};
for (const toolPath of paths) {
@ -259,6 +264,9 @@ export async function loadCustomTools(
sharedApi.ui = uiContext;
sharedApi.hasUI = hasUI;
},
setSendMessageHandler(handler) {
sharedApi.sendMessage = handler;
},
};
}
@ -303,12 +311,14 @@ function discoverToolsInDir(dir: string): string[] {
* @param cwd - Current working directory
* @param builtInToolNames - Names of built-in tools to check for conflicts
* @param agentDir - Agent config directory. Default: from getAgentDir()
* @param eventBus - Optional shared event bus (creates isolated bus if not provided)
*/
export async function discoverAndLoadCustomTools(
configuredPaths: string[],
cwd: string,
builtInToolNames: string[],
agentDir: string = getAgentDir(),
eventBus?: EventBus,
): Promise<CustomToolsLoadResult> {
const allPaths: string[] = [];
const seen = new Set<string>();
@ -335,5 +345,5 @@ export async function discoverAndLoadCustomTools(
// 3. Explicitly configured paths (can override/add)
addPaths(configuredPaths.map((p) => resolveToolPath(p, cwd)));
return loadCustomTools(allPaths, cwd, builtInToolNames);
return loadCustomTools(allPaths, cwd, builtInToolNames, eventBus);
}

View file

@ -10,8 +10,10 @@ import type { Model } from "@mariozechner/pi-ai";
import type { Component } from "@mariozechner/pi-tui";
import type { Static, TSchema } from "@sinclair/typebox";
import type { Theme } from "../../modes/interactive/theme/theme.js";
import type { EventBus } from "../event-bus.js";
import type { ExecOptions, ExecResult } from "../exec.js";
import type { HookUIContext } from "../hooks/types.js";
import type { HookMessage } from "../messages.js";
import type { ModelRegistry } from "../model-registry.js";
import type { ReadonlySessionManager } from "../session-manager.js";
@ -34,6 +36,30 @@ export interface CustomToolAPI {
ui: CustomToolUIContext;
/** Whether UI is available (false in print/RPC mode) */
hasUI: boolean;
/** Shared event bus for tool/hook communication */
events: EventBus;
/**
* Send a message to the agent session.
*
* Delivery behavior depends on agent state and options:
* - Streaming + "steer" (default): Interrupt mid-run, delivered after current tool.
* - Streaming + "followUp": Wait until agent finishes before delivery.
* - Idle + triggerTurn: Triggers a new LLM turn immediately.
* - Idle + "nextTurn": Queue to be included with the next user message as context.
* - Idle + neither: Append to session history as standalone entry.
*
* @param message - The message to send
* @param message.customType - Identifier for your tool
* @param message.content - Message content (string or TextContent/ImageContent array)
* @param message.display - Whether to show in TUI (true = styled display, false = hidden)
* @param message.details - Optional tool-specific metadata (not sent to LLM)
* @param options.triggerTurn - If true and agent is idle, triggers a new LLM turn
* @param options.deliverAs - Delivery mode: "steer", "followUp", or "nextTurn"
*/
sendMessage<T = unknown>(
message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" | "nextTurn" },
): void;
}
/**
@ -156,10 +182,18 @@ export interface LoadedCustomTool {
tool: CustomTool;
}
/** Send message handler type for tool sendMessage */
export type ToolSendMessageHandler = <T = unknown>(
message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" | "nextTurn" },
) => void;
/** Result from loading custom tools */
export interface CustomToolsLoadResult {
tools: LoadedCustomTool[];
errors: Array<{ path: string; error: string }>;
/** Update the UI context for all loaded tools. Call when mode initializes. */
setUIContext(uiContext: CustomToolUIContext, hasUI: boolean): void;
/** Set the sendMessage handler for all loaded tools. Call when session initializes. */
setSendMessageHandler(handler: ToolSendMessageHandler): void;
}

View file

@ -0,0 +1,33 @@
import { EventEmitter } from "node:events";
export interface EventBus {
emit(channel: string, data: unknown): void;
on(channel: string, handler: (data: unknown) => void): () => void;
}
export interface EventBusController extends EventBus {
clear(): void;
}
export function createEventBus(): EventBusController {
const emitter = new EventEmitter();
return {
emit: (channel, data) => {
emitter.emit(channel, data);
},
on: (channel, handler) => {
const safeHandler = (data: unknown) => {
try {
handler(data);
} catch (err) {
console.error(`Event handler error (${channel}):`, err);
}
};
emitter.on(channel, safeHandler);
return () => emitter.off(channel, safeHandler);
},
clear: () => {
emitter.removeAllListeners();
},
};
}

View file

@ -10,6 +10,7 @@ import { fileURLToPath } from "node:url";
import type { KeyId } from "@mariozechner/pi-tui";
import { createJiti } from "jiti";
import { getAgentDir } from "../../config.js";
import { createEventBus, type EventBus } from "../event-bus.js";
import type { HookMessage } from "../messages.js";
import type { SessionManager } from "../session-manager.js";
import { execCommand } from "./runner.js";
@ -61,7 +62,7 @@ type HandlerFn = (...args: unknown[]) => Promise<unknown>;
*/
export type SendMessageHandler = <T = unknown>(
message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" },
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" | "nextTurn" },
) => void;
/**
@ -221,6 +222,7 @@ function createHookAPI(
handlers: Map<string, HandlerFn[]>,
cwd: string,
hookPath: string,
eventBus: EventBus,
): {
api: HookAPI;
messageRenderers: Map<string, HookMessageRenderer>;
@ -292,7 +294,6 @@ function createHookAPI(
options: { description?: string; type: "boolean" | "string"; default?: boolean | string },
): void {
flags.set(name, { name, hookPath, ...options });
// Set default value if provided
if (options.default !== undefined) {
flagValues.set(name, options.default);
}
@ -309,6 +310,7 @@ function createHookAPI(
): void {
shortcuts.set(shortcut, { shortcut, hookPath, ...options });
},
events: eventBus,
} as HookAPI;
return {
@ -342,7 +344,11 @@ function createHookAPI(
/**
* Load a single hook module using jiti.
*/
async function loadHook(hookPath: string, cwd: string): Promise<{ hook: LoadedHook | null; error: string | null }> {
async function loadHook(
hookPath: string,
cwd: string,
eventBus: EventBus,
): Promise<{ hook: LoadedHook | null; error: string | null }> {
const resolvedPath = resolveHookPath(hookPath, cwd);
try {
@ -376,7 +382,7 @@ async function loadHook(hookPath: string, cwd: string): Promise<{ hook: LoadedHo
setGetAllToolsHandler,
setSetActiveToolsHandler,
setFlagValue,
} = createHookAPI(handlers, cwd, hookPath);
} = createHookAPI(handlers, cwd, hookPath, eventBus);
// Call factory to register handlers
factory(api);
@ -410,13 +416,15 @@ async function loadHook(hookPath: string, cwd: string): Promise<{ hook: LoadedHo
* Load all hooks from configuration.
* @param paths - Array of hook file paths
* @param cwd - Current working directory for resolving relative paths
* @param eventBus - Optional shared event bus (creates isolated bus if not provided)
*/
export async function loadHooks(paths: string[], cwd: string): Promise<LoadHooksResult> {
export async function loadHooks(paths: string[], cwd: string, eventBus?: EventBus): Promise<LoadHooksResult> {
const hooks: LoadedHook[] = [];
const errors: Array<{ path: string; error: string }> = [];
const resolvedEventBus = eventBus ?? createEventBus();
for (const hookPath of paths) {
const { hook, error } = await loadHook(hookPath, cwd);
const { hook, error } = await loadHook(hookPath, cwd, resolvedEventBus);
if (error) {
errors.push({ path: hookPath, error });
@ -456,11 +464,17 @@ function discoverHooksInDir(dir: string): string[] {
* 2. cwd/.pi/hooks/*.ts (project-local)
*
* Plus any explicitly configured paths from settings.
*
* @param configuredPaths - Explicitly configured hook paths
* @param cwd - Current working directory
* @param agentDir - Agent configuration directory
* @param eventBus - Optional shared event bus (creates isolated bus if not provided)
*/
export async function discoverAndLoadHooks(
configuredPaths: string[],
cwd: string,
agentDir: string = getAgentDir(),
eventBus?: EventBus,
): Promise<LoadHooksResult> {
const allPaths: string[] = [];
const seen = new Set<string>();
@ -487,5 +501,5 @@ export async function discoverAndLoadHooks(
// 3. Explicitly configured paths (can override/add)
addPaths(configuredPaths.map((p) => resolveHookPath(p, cwd)));
return loadHooks(allPaths, cwd);
return loadHooks(allPaths, cwd, eventBus);
}

View file

@ -10,6 +10,7 @@ import type { ImageContent, Model, TextContent, ToolResultMessage } from "@mario
import type { Component, KeyId, TUI } from "@mariozechner/pi-tui";
import type { Theme } from "../../modes/interactive/theme/theme.js";
import type { CompactionPreparation, CompactionResult } from "../compaction/index.js";
import type { EventBus } from "../event-bus.js";
import type { ExecOptions, ExecResult } from "../exec.js";
import type { HookMessage } from "../messages.js";
import type { ModelRegistry } from "../model-registry.js";
@ -747,15 +748,19 @@ export interface HookAPI {
* @param message.content - Message content (string or TextContent/ImageContent array)
* @param message.display - Whether to show in TUI (true = styled display, false = hidden)
* @param message.details - Optional hook-specific metadata (not sent to LLM)
* @param options.triggerTurn - If true and agent is idle, triggers a new LLM turn. Default: false.
* @param options.triggerTurn - If true and agent is idle, triggers a new LLM turn.
* Required for async patterns where you want the agent to respond.
* If agent is streaming, message is queued and triggerTurn is ignored.
* @param options.deliverAs - How to deliver when agent is streaming. Default: "steer".
* - "steer": Interrupt mid-run, delivered after current tool execution.
* - "followUp": Wait until agent finishes all work before delivery.
* @param options.deliverAs - How to deliver the message. Default: "steer".
* - "steer": (streaming) Interrupt mid-run, delivered after current tool execution.
* - "followUp": (streaming) Wait until agent finishes all work before delivery.
* - "nextTurn": (idle) Queue to be included with the next user message as context.
* The message becomes an "aside" - context for the next turn without
* triggering a turn or appearing as a standalone entry.
*/
sendMessage<T = unknown>(
message: Pick<HookMessage<T>, "customType" | "content" | "display" | "details">,
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" },
options?: { triggerTurn?: boolean; deliverAs?: "steer" | "followUp" | "nextTurn" },
): void;
/**
@ -899,6 +904,21 @@ export interface HookAPI {
handler: (ctx: HookContext) => Promise<void> | void;
},
): void;
/**
* Shared event bus for tool/hook communication.
* Tools can emit events, hooks can listen for them.
*
* @example
* // Hook listening for events
* pi.events.on("subagent:complete", (data) => {
* pi.sendMessage({ customType: "notify", content: `Done: ${data.summary}` });
* });
*
* // Tool emitting events (in custom tool)
* pi.events.emit("my:event", { status: "complete" });
*/
events: EventBus;
}
/**

View file

@ -25,6 +25,7 @@ export {
loadCustomTools,
type RenderResultOptions,
} from "./custom-tools/index.js";
export { createEventBus, type EventBus, type EventBusController } from "./event-bus.js";
export {
type HookAPI,
type HookContext,

View file

@ -43,6 +43,7 @@ import {
wrapCustomTools,
} from "./custom-tools/index.js";
import type { CustomTool } from "./custom-tools/types.js";
import { createEventBus, type EventBus } from "./event-bus.js";
import { discoverAndLoadHooks, HookRunner, type LoadedHook, wrapToolsWithHooks } from "./hooks/index.js";
import type { HookFactory } from "./hooks/types.js";
import { convertToLlm } from "./messages.js";
@ -118,6 +119,9 @@ export interface CreateAgentSessionOptions {
/** Pre-loaded hooks (skips loading, used when hooks were loaded early for CLI flags). */
preloadedHooks?: LoadedHook[];
/** Shared event bus for tool/hook communication. Default: creates new bus. */
eventBus?: EventBus;
/** Skills. Default: discovered from multiple locations */
skills?: Skill[];
/** Context files (AGENTS.md content). Default: discovered walking up from cwd */
@ -199,15 +203,19 @@ export function discoverModels(authStorage: AuthStorage, agentDir: string = getD
/**
* Discover hooks from cwd and agentDir.
* @param cwd - Current working directory
* @param agentDir - Agent configuration directory
* @param eventBus - Optional shared event bus (creates isolated bus if not provided)
*/
export async function discoverHooks(
cwd?: string,
agentDir?: string,
eventBus?: EventBus,
): Promise<Array<{ path: string; factory: HookFactory }>> {
const resolvedCwd = cwd ?? process.cwd();
const resolvedAgentDir = agentDir ?? getDefaultAgentDir();
const { hooks, errors } = await discoverAndLoadHooks([], resolvedCwd, resolvedAgentDir);
const { hooks, errors } = await discoverAndLoadHooks([], resolvedCwd, resolvedAgentDir, eventBus);
// Log errors but don't fail
for (const { path, error } of errors) {
@ -222,15 +230,25 @@ export async function discoverHooks(
/**
* Discover custom tools from cwd and agentDir.
* @param cwd - Current working directory
* @param agentDir - Agent configuration directory
* @param eventBus - Optional shared event bus (creates isolated bus if not provided)
*/
export async function discoverCustomTools(
cwd?: string,
agentDir?: string,
eventBus?: EventBus,
): Promise<Array<{ path: string; tool: CustomTool }>> {
const resolvedCwd = cwd ?? process.cwd();
const resolvedAgentDir = agentDir ?? getDefaultAgentDir();
const { tools, errors } = await discoverAndLoadCustomTools([], resolvedCwd, Object.keys(allTools), resolvedAgentDir);
const { tools, errors } = await discoverAndLoadCustomTools(
[],
resolvedCwd,
Object.keys(allTools),
resolvedAgentDir,
eventBus,
);
// Log errors but don't fail
for (const { path, error } of errors) {
@ -344,7 +362,10 @@ function createFactoryFromLoadedHook(loaded: LoadedHook): HookFactory {
/**
* Convert hook definitions to LoadedHooks for the HookRunner.
*/
function createLoadedHooksFromDefinitions(definitions: Array<{ path?: string; factory: HookFactory }>): LoadedHook[] {
function createLoadedHooksFromDefinitions(
definitions: Array<{ path?: string; factory: HookFactory }>,
eventBus: EventBus,
): LoadedHook[] {
return definitions.map((def) => {
const hookPath = def.path ?? "<inline>";
const handlers = new Map<string, Array<(...args: unknown[]) => Promise<unknown>>>();
@ -401,6 +422,7 @@ function createLoadedHooksFromDefinitions(definitions: Array<{ path?: string; fa
getActiveTools: () => getActiveToolsHandler(),
getAllTools: () => getAllToolsHandler(),
setActiveTools: (toolNames: string[]) => setActiveToolsHandler(toolNames),
events: eventBus,
};
def.factory(api as any);
@ -484,6 +506,7 @@ function createLoadedHooksFromDefinitions(definitions: Array<{ path?: string; fa
export async function createAgentSession(options: CreateAgentSessionOptions = {}): Promise<CreateAgentSessionResult> {
const cwd = options.cwd ?? process.cwd();
const agentDir = options.agentDir ?? getDefaultAgentDir();
const eventBus = options.eventBus ?? createEventBus();
// Use provided or create AuthStorage and ModelRegistry
const authStorage = options.authStorage ?? discoverAuthStorage(agentDir);
@ -591,11 +614,18 @@ export async function createAgentSession(options: CreateAgentSessionOptions = {}
tools: loadedTools,
errors: [],
setUIContext: () => {},
setSendMessageHandler: () => {},
};
} else {
// Discover custom tools, merging with additional paths
const configuredPaths = [...settingsManager.getCustomToolPaths(), ...(options.additionalCustomToolPaths ?? [])];
customToolsResult = await discoverAndLoadCustomTools(configuredPaths, cwd, Object.keys(allTools), agentDir);
customToolsResult = await discoverAndLoadCustomTools(
configuredPaths,
cwd,
Object.keys(allTools),
agentDir,
eventBus,
);
time("discoverAndLoadCustomTools");
for (const { path, error } of customToolsResult.errors) {
console.error(`Failed to load custom tool "${path}": ${error}`);
@ -608,13 +638,13 @@ export async function createAgentSession(options: CreateAgentSessionOptions = {}
hookRunner = new HookRunner(options.preloadedHooks, cwd, sessionManager, modelRegistry);
} else if (options.hooks !== undefined) {
if (options.hooks.length > 0) {
const loadedHooks = createLoadedHooksFromDefinitions(options.hooks);
const loadedHooks = createLoadedHooksFromDefinitions(options.hooks, eventBus);
hookRunner = new HookRunner(loadedHooks, cwd, sessionManager, modelRegistry);
}
} else {
// Discover hooks, merging with additional paths
const configuredPaths = [...settingsManager.getHookPaths(), ...(options.additionalHookPaths ?? [])];
const { hooks, errors } = await discoverAndLoadHooks(configuredPaths, cwd, agentDir);
const { hooks, errors } = await discoverAndLoadHooks(configuredPaths, cwd, agentDir, eventBus);
time("discoverAndLoadHooks");
for (const { path, error } of errors) {
console.error(`Failed to load hook "${path}": ${error}`);
@ -755,6 +785,11 @@ export async function createAgentSession(options: CreateAgentSessionOptions = {}
});
time("createAgentSession");
// Wire up sendMessage for custom tools
customToolsResult.setSendMessageHandler((msg, opts) => {
session.sendHookMessage(msg, opts);
});
return {
session,
customToolsResult,