Fix event bus async error handling, clear pending messages on session switch, improve SDK docs

- event-bus.ts: await async handlers to catch errors properly
- agent-session.ts: clear _pendingNextTurnMessages on newSession/switchSession/branch
- sdk.ts: make eventBus first (required) param for discoverHooks/discoverCustomTools
- docs/sdk.md: document eventBus sharing pattern for hook/tool communication
This commit is contained in:
Mario Zechner 2026-01-04 22:04:50 +01:00
parent 024ec33bf2
commit be330fdd9c
4 changed files with 35 additions and 13 deletions

View file

@ -463,10 +463,14 @@ const { session } = await createAgentSession({
customTools: [{ tool: myTool }], customTools: [{ tool: myTool }],
}); });
// Merge with discovered tools // Merge with discovered tools (share eventBus for tool.events communication)
const discovered = await discoverCustomTools(); import { createEventBus } from "@mariozechner/pi-coding-agent";
const eventBus = createEventBus();
const discovered = await discoverCustomTools(eventBus);
const { session } = await createAgentSession({ const { session } = await createAgentSession({
customTools: [...discovered, { tool: myTool }], customTools: [...discovered, { tool: myTool }],
eventBus,
}); });
// Add paths without replacing discovery // Add paths without replacing discovery
@ -528,10 +532,14 @@ const { session } = await createAgentSession({
hooks: [], hooks: [],
}); });
// Merge with discovered // Merge with discovered (share eventBus for pi.events communication)
const discovered = await discoverHooks(); import { createEventBus } from "@mariozechner/pi-coding-agent";
const eventBus = createEventBus();
const discovered = await discoverHooks(eventBus);
const { session } = await createAgentSession({ const { session } = await createAgentSession({
hooks: [...discovered, { factory: loggingHook }], hooks: [...discovered, { factory: loggingHook }],
eventBus,
}); });
// Add paths without replacing // Add paths without replacing
@ -540,8 +548,12 @@ const { session } = await createAgentSession({
}); });
``` ```
**Event Bus:** If hooks or tools use `pi.events` for inter-component communication, pass the same `eventBus` to `discoverHooks()`, `discoverCustomTools()`, and `createAgentSession()`. Otherwise each gets an isolated bus and events won't be shared.
Hook API methods: Hook API methods:
- `api.on(event, handler)` - Subscribe to events - `api.on(event, handler)` - Subscribe to lifecycle events
- `api.events.emit(channel, data)` - Emit to shared event bus
- `api.events.on(channel, handler)` - Listen on shared event bus
- `api.sendMessage(message, triggerTurn?)` - Inject message (creates `CustomMessageEntry`) - `api.sendMessage(message, triggerTurn?)` - Inject message (creates `CustomMessageEntry`)
- `api.appendEntry(customType, data?)` - Persist hook state (not in LLM context) - `api.appendEntry(customType, data?)` - Persist hook state (not in LLM context)
- `api.registerCommand(name, options)` - Register custom slash command - `api.registerCommand(name, options)` - Register custom slash command
@ -778,10 +790,12 @@ const builtIn = getModel("anthropic", "claude-opus-4-5"); // Built-in only
const skills = discoverSkills(cwd, agentDir, skillsSettings); const skills = discoverSkills(cwd, agentDir, skillsSettings);
// Hooks (async - loads TypeScript) // Hooks (async - loads TypeScript)
const hooks = await discoverHooks(cwd, agentDir); // Pass eventBus to share pi.events across hooks/tools
const eventBus = createEventBus();
const hooks = await discoverHooks(eventBus, cwd, agentDir);
// Custom tools (async - loads TypeScript) // Custom tools (async - loads TypeScript)
const tools = await discoverCustomTools(cwd, agentDir); const tools = await discoverCustomTools(eventBus, cwd, agentDir);
// Context files // Context files
const contextFiles = discoverContextFiles(cwd, agentDir); const contextFiles = discoverContextFiles(cwd, agentDir);
@ -951,6 +965,9 @@ discoverCustomTools
discoverContextFiles discoverContextFiles
discoverSlashCommands discoverSlashCommands
// Event Bus (for shared hook/tool communication)
createEventBus
// Helpers // Helpers
loadSettings loadSettings
buildSystemPrompt buildSystemPrompt

View file

@ -865,6 +865,7 @@ export class AgentSession {
this.sessionManager.newSession(options); this.sessionManager.newSession(options);
this._steeringMessages = []; this._steeringMessages = [];
this._followUpMessages = []; this._followUpMessages = [];
this._pendingNextTurnMessages = [];
this._reconnectToAgent(); this._reconnectToAgent();
// Emit session_switch event with reason "new" to hooks // Emit session_switch event with reason "new" to hooks
@ -1653,6 +1654,7 @@ export class AgentSession {
await this.abort(); await this.abort();
this._steeringMessages = []; this._steeringMessages = [];
this._followUpMessages = []; this._followUpMessages = [];
this._pendingNextTurnMessages = [];
// Set new session // Set new session
this.sessionManager.setSessionFile(sessionPath); this.sessionManager.setSessionFile(sessionPath);
@ -1728,6 +1730,9 @@ export class AgentSession {
skipConversationRestore = result?.skipConversationRestore ?? false; skipConversationRestore = result?.skipConversationRestore ?? false;
} }
// Clear pending messages (bound to old session state)
this._pendingNextTurnMessages = [];
if (!selectedEntry.parentId) { if (!selectedEntry.parentId) {
this.sessionManager.newSession(); this.sessionManager.newSession();
} else { } else {

View file

@ -16,9 +16,9 @@ export function createEventBus(): EventBusController {
emitter.emit(channel, data); emitter.emit(channel, data);
}, },
on: (channel, handler) => { on: (channel, handler) => {
const safeHandler = (data: unknown) => { const safeHandler = async (data: unknown) => {
try { try {
handler(data); await handler(data);
} catch (err) { } catch (err) {
console.error(`Event handler error (${channel}):`, err); console.error(`Event handler error (${channel}):`, err);
} }

View file

@ -203,14 +203,14 @@ export function discoverModels(authStorage: AuthStorage, agentDir: string = getD
/** /**
* Discover hooks from cwd and agentDir. * Discover hooks from cwd and agentDir.
* @param eventBus - Shared event bus for pi.events communication. Pass to createAgentSession too.
* @param cwd - Current working directory * @param cwd - Current working directory
* @param agentDir - Agent configuration directory * @param agentDir - Agent configuration directory
* @param eventBus - Optional shared event bus (creates isolated bus if not provided)
*/ */
export async function discoverHooks( export async function discoverHooks(
eventBus: EventBus,
cwd?: string, cwd?: string,
agentDir?: string, agentDir?: string,
eventBus?: EventBus,
): Promise<Array<{ path: string; factory: HookFactory }>> { ): Promise<Array<{ path: string; factory: HookFactory }>> {
const resolvedCwd = cwd ?? process.cwd(); const resolvedCwd = cwd ?? process.cwd();
const resolvedAgentDir = agentDir ?? getDefaultAgentDir(); const resolvedAgentDir = agentDir ?? getDefaultAgentDir();
@ -230,14 +230,14 @@ export async function discoverHooks(
/** /**
* Discover custom tools from cwd and agentDir. * Discover custom tools from cwd and agentDir.
* @param eventBus - Shared event bus for tool.events communication. Pass to createAgentSession too.
* @param cwd - Current working directory * @param cwd - Current working directory
* @param agentDir - Agent configuration directory * @param agentDir - Agent configuration directory
* @param eventBus - Optional shared event bus (creates isolated bus if not provided)
*/ */
export async function discoverCustomTools( export async function discoverCustomTools(
eventBus: EventBus,
cwd?: string, cwd?: string,
agentDir?: string, agentDir?: string,
eventBus?: EventBus,
): Promise<Array<{ path: string; tool: CustomTool }>> { ): Promise<Array<{ path: string; tool: CustomTool }>> {
const resolvedCwd = cwd ?? process.cwd(); const resolvedCwd = cwd ?? process.cwd();
const resolvedAgentDir = agentDir ?? getDefaultAgentDir(); const resolvedAgentDir = agentDir ?? getDefaultAgentDir();