co-mono/packages/coding-agent/src/modes/rpc/rpc-mode.ts
Mario Zechner e7097d911a Custom tools with session lifecycle, examples for hooks and tools
- Custom tools: TypeScript modules that extend pi with new tools
  - Custom TUI rendering via renderCall/renderResult
  - User interaction via pi.ui (select, confirm, input, notify)
  - Session lifecycle via onSession callback for state reconstruction
  - Examples: todo.ts, question.ts, hello.ts

- Hook examples: permission-gate, git-checkpoint, protected-paths

- Session lifecycle centralized in AgentSession
  - Works across all modes (interactive, print, RPC)
  - Unified session event for hooks (replaces session_start/session_switch)

- Box component added to pi-tui

- Examples bundled in npm and binary releases

Fixes #190
2025-12-17 16:03:23 +01:00

409 lines
12 KiB
TypeScript

/**
* RPC mode: Headless operation with JSON stdin/stdout protocol.
*
* Used for embedding the agent in other applications.
* Receives commands as JSON on stdin, outputs events and responses as JSON on stdout.
*
* Protocol:
* - Commands: JSON objects with `type` field, optional `id` for correlation
* - Responses: JSON objects with `type: "response"`, `command`, `success`, and optional `data`/`error`
* - Events: AgentSessionEvent objects streamed as they occur
* - Hook UI: Hook UI requests are emitted, client responds with hook_ui_response
*/
import * as crypto from "node:crypto";
import * as readline from "readline";
import type { AgentSession } from "../../core/agent-session.js";
import type { HookUIContext } from "../../core/hooks/index.js";
import type { RpcCommand, RpcHookUIRequest, RpcHookUIResponse, RpcResponse, RpcSessionState } from "./rpc-types.js";
// Re-export types for consumers
export type { RpcCommand, RpcHookUIRequest, RpcHookUIResponse, RpcResponse, RpcSessionState } from "./rpc-types.js";
/**
* Run in RPC mode.
* Listens for JSON commands on stdin, outputs events and responses on stdout.
*/
export async function runRpcMode(session: AgentSession): Promise<never> {
const output = (obj: RpcResponse | RpcHookUIRequest | object) => {
console.log(JSON.stringify(obj));
};
const success = <T extends RpcCommand["type"]>(
id: string | undefined,
command: T,
data?: object | null,
): RpcResponse => {
if (data === undefined) {
return { id, type: "response", command, success: true } as RpcResponse;
}
return { id, type: "response", command, success: true, data } as RpcResponse;
};
const error = (id: string | undefined, command: string, message: string): RpcResponse => {
return { id, type: "response", command, success: false, error: message };
};
// Pending hook UI requests waiting for response
const pendingHookRequests = new Map<string, { resolve: (value: any) => void; reject: (error: Error) => void }>();
/**
* Create a hook UI context that uses the RPC protocol.
*/
const createHookUIContext = (): HookUIContext => ({
async select(title: string, options: string[]): Promise<string | null> {
const id = crypto.randomUUID();
return new Promise((resolve, reject) => {
pendingHookRequests.set(id, {
resolve: (response: RpcHookUIResponse) => {
if ("cancelled" in response && response.cancelled) {
resolve(null);
} else if ("value" in response) {
resolve(response.value);
} else {
resolve(null);
}
},
reject,
});
output({ type: "hook_ui_request", id, method: "select", title, options } as RpcHookUIRequest);
});
},
async confirm(title: string, message: string): Promise<boolean> {
const id = crypto.randomUUID();
return new Promise((resolve, reject) => {
pendingHookRequests.set(id, {
resolve: (response: RpcHookUIResponse) => {
if ("cancelled" in response && response.cancelled) {
resolve(false);
} else if ("confirmed" in response) {
resolve(response.confirmed);
} else {
resolve(false);
}
},
reject,
});
output({ type: "hook_ui_request", id, method: "confirm", title, message } as RpcHookUIRequest);
});
},
async input(title: string, placeholder?: string): Promise<string | null> {
const id = crypto.randomUUID();
return new Promise((resolve, reject) => {
pendingHookRequests.set(id, {
resolve: (response: RpcHookUIResponse) => {
if ("cancelled" in response && response.cancelled) {
resolve(null);
} else if ("value" in response) {
resolve(response.value);
} else {
resolve(null);
}
},
reject,
});
output({ type: "hook_ui_request", id, method: "input", title, placeholder } as RpcHookUIRequest);
});
},
notify(message: string, type?: "info" | "warning" | "error"): void {
// Fire and forget - no response needed
output({
type: "hook_ui_request",
id: crypto.randomUUID(),
method: "notify",
message,
notifyType: type,
} as RpcHookUIRequest);
},
});
// Load entries once for session start events
const entries = session.sessionManager.loadEntries();
// Set up hooks with RPC-based UI context
const hookRunner = session.hookRunner;
if (hookRunner) {
hookRunner.setUIContext(createHookUIContext(), false);
hookRunner.setSessionFile(session.sessionFile);
hookRunner.onError((err) => {
output({ type: "hook_error", hookPath: err.hookPath, event: err.event, error: err.error });
});
// Set up send handler for pi.send()
hookRunner.setSendHandler((text, attachments) => {
// In RPC mode, just queue or prompt based on streaming state
if (session.isStreaming) {
session.queueMessage(text);
} else {
session.prompt(text, { attachments }).catch((e) => {
output(error(undefined, "hook_send", e.message));
});
}
});
// Emit session event
await hookRunner.emit({
type: "session",
entries,
sessionFile: session.sessionFile,
previousSessionFile: null,
reason: "start",
});
}
// Emit session start event to custom tools
// Note: Tools get no-op UI context in RPC mode (host handles UI via protocol)
for (const { tool } of session.customTools) {
if (tool.onSession) {
try {
await tool.onSession({
entries,
sessionFile: session.sessionFile,
previousSessionFile: null,
reason: "start",
});
} catch (_err) {
// Silently ignore tool errors
}
}
}
// Output all agent events as JSON
session.subscribe((event) => {
output(event);
});
// Handle a single command
const handleCommand = async (command: RpcCommand): Promise<RpcResponse> => {
const id = command.id;
switch (command.type) {
// =================================================================
// Prompting
// =================================================================
case "prompt": {
// Don't await - events will stream
session
.prompt(command.message, {
attachments: command.attachments,
expandSlashCommands: false,
})
.catch((e) => output(error(id, "prompt", e.message)));
return success(id, "prompt");
}
case "queue_message": {
await session.queueMessage(command.message);
return success(id, "queue_message");
}
case "abort": {
await session.abort();
return success(id, "abort");
}
case "reset": {
await session.reset();
return success(id, "reset");
}
// =================================================================
// State
// =================================================================
case "get_state": {
const state: RpcSessionState = {
model: session.model,
thinkingLevel: session.thinkingLevel,
isStreaming: session.isStreaming,
isCompacting: session.isCompacting,
queueMode: session.queueMode,
sessionFile: session.sessionFile,
sessionId: session.sessionId,
autoCompactionEnabled: session.autoCompactionEnabled,
messageCount: session.messages.length,
queuedMessageCount: session.queuedMessageCount,
};
return success(id, "get_state", state);
}
// =================================================================
// Model
// =================================================================
case "set_model": {
const models = await session.getAvailableModels();
const model = models.find((m) => m.provider === command.provider && m.id === command.modelId);
if (!model) {
return error(id, "set_model", `Model not found: ${command.provider}/${command.modelId}`);
}
await session.setModel(model);
return success(id, "set_model", model);
}
case "cycle_model": {
const result = await session.cycleModel();
if (!result) {
return success(id, "cycle_model", null);
}
return success(id, "cycle_model", result);
}
case "get_available_models": {
const models = await session.getAvailableModels();
return success(id, "get_available_models", { models });
}
// =================================================================
// Thinking
// =================================================================
case "set_thinking_level": {
session.setThinkingLevel(command.level);
return success(id, "set_thinking_level");
}
case "cycle_thinking_level": {
const level = session.cycleThinkingLevel();
if (!level) {
return success(id, "cycle_thinking_level", null);
}
return success(id, "cycle_thinking_level", { level });
}
// =================================================================
// Queue Mode
// =================================================================
case "set_queue_mode": {
session.setQueueMode(command.mode);
return success(id, "set_queue_mode");
}
// =================================================================
// Compaction
// =================================================================
case "compact": {
const result = await session.compact(command.customInstructions);
return success(id, "compact", result);
}
case "set_auto_compaction": {
session.setAutoCompactionEnabled(command.enabled);
return success(id, "set_auto_compaction");
}
// =================================================================
// Retry
// =================================================================
case "set_auto_retry": {
session.setAutoRetryEnabled(command.enabled);
return success(id, "set_auto_retry");
}
case "abort_retry": {
session.abortRetry();
return success(id, "abort_retry");
}
// =================================================================
// Bash
// =================================================================
case "bash": {
const result = await session.executeBash(command.command);
return success(id, "bash", result);
}
case "abort_bash": {
session.abortBash();
return success(id, "abort_bash");
}
// =================================================================
// Session
// =================================================================
case "get_session_stats": {
const stats = session.getSessionStats();
return success(id, "get_session_stats", stats);
}
case "export_html": {
const path = session.exportToHtml(command.outputPath);
return success(id, "export_html", { path });
}
case "switch_session": {
await session.switchSession(command.sessionPath);
return success(id, "switch_session");
}
case "branch": {
const result = await session.branch(command.entryIndex);
return success(id, "branch", { text: result.selectedText, skipped: result.skipped });
}
case "get_branch_messages": {
const messages = session.getUserMessagesForBranching();
return success(id, "get_branch_messages", { messages });
}
case "get_last_assistant_text": {
const text = session.getLastAssistantText();
return success(id, "get_last_assistant_text", { text });
}
// =================================================================
// Messages
// =================================================================
case "get_messages": {
return success(id, "get_messages", { messages: session.messages });
}
default: {
const unknownCommand = command as { type: string };
return error(undefined, unknownCommand.type, `Unknown command: ${unknownCommand.type}`);
}
}
};
// Listen for JSON input
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
rl.on("line", async (line: string) => {
try {
const parsed = JSON.parse(line);
// Handle hook UI responses
if (parsed.type === "hook_ui_response") {
const response = parsed as RpcHookUIResponse;
const pending = pendingHookRequests.get(response.id);
if (pending) {
pendingHookRequests.delete(response.id);
pending.resolve(response);
}
return;
}
// Handle regular commands
const command = parsed as RpcCommand;
const response = await handleCommand(command);
output(response);
} catch (e: any) {
output(error(undefined, "parse", `Failed to parse command: ${e.message}`));
}
});
// Keep process alive forever
return new Promise(() => {});
}