/** * RPC Client for programmatic access to the coding agent. * * Spawns the agent in RPC mode and provides a typed API for all operations. */ import { type ChildProcess, spawn } from "node:child_process"; import * as readline from "node:readline"; import type { AgentEvent, AgentMessage, ThinkingLevel } from "@mariozechner/pi-agent-core"; import type { ImageContent } from "@mariozechner/pi-ai"; import type { SessionStats } from "../../core/agent-session.js"; import type { BashResult } from "../../core/bash-executor.js"; import type { CompactionResult } from "../../core/compaction/index.js"; import type { RpcCommand, RpcResponse, RpcSessionState, RpcSlashCommand } from "./rpc-types.js"; // ============================================================================ // Types // ============================================================================ /** Distributive Omit that works with union types */ type DistributiveOmit = T extends unknown ? Omit : never; /** RpcCommand without the id field (for internal send) */ type RpcCommandBody = DistributiveOmit; export interface RpcClientOptions { /** Path to the CLI entry point (default: searches for dist/cli.js) */ cliPath?: string; /** Working directory for the agent */ cwd?: string; /** Environment variables */ env?: Record; /** Provider to use */ provider?: string; /** Model ID to use */ model?: string; /** Additional CLI arguments */ args?: string[]; } export interface ModelInfo { provider: string; id: string; contextWindow: number; reasoning: boolean; } export type RpcEventListener = (event: AgentEvent) => void; // ============================================================================ // RPC Client // ============================================================================ export class RpcClient { private process: ChildProcess | null = null; private rl: readline.Interface | null = null; private eventListeners: RpcEventListener[] = []; private pendingRequests: Map void; reject: (error: Error) => void }> = new Map(); private requestId = 0; private stderr = ""; constructor(private options: RpcClientOptions = {}) {} /** * Start the RPC agent process. */ async start(): Promise { if (this.process) { throw new Error("Client already started"); } const cliPath = this.options.cliPath ?? "dist/cli.js"; const args = ["--mode", "rpc"]; if (this.options.provider) { args.push("--provider", this.options.provider); } if (this.options.model) { args.push("--model", this.options.model); } if (this.options.args) { args.push(...this.options.args); } this.process = spawn("node", [cliPath, ...args], { cwd: this.options.cwd, env: { ...process.env, ...this.options.env }, stdio: ["pipe", "pipe", "pipe"], }); // Collect stderr for debugging this.process.stderr?.on("data", (data) => { this.stderr += data.toString(); }); // Set up line reader for stdout this.rl = readline.createInterface({ input: this.process.stdout!, terminal: false, }); this.rl.on("line", (line) => { this.handleLine(line); }); // Wait a moment for process to initialize await new Promise((resolve) => setTimeout(resolve, 100)); if (this.process.exitCode !== null) { throw new Error(`Agent process exited immediately with code ${this.process.exitCode}. Stderr: ${this.stderr}`); } } /** * Stop the RPC agent process. */ async stop(): Promise { if (!this.process) return; this.rl?.close(); this.process.kill("SIGTERM"); // Wait for process to exit await new Promise((resolve) => { const timeout = setTimeout(() => { this.process?.kill("SIGKILL"); resolve(); }, 1000); this.process?.on("exit", () => { clearTimeout(timeout); resolve(); }); }); this.process = null; this.rl = null; this.pendingRequests.clear(); } /** * Subscribe to agent events. */ onEvent(listener: RpcEventListener): () => void { this.eventListeners.push(listener); return () => { const index = this.eventListeners.indexOf(listener); if (index !== -1) { this.eventListeners.splice(index, 1); } }; } /** * Get collected stderr output (useful for debugging). */ getStderr(): string { return this.stderr; } // ========================================================================= // Command Methods // ========================================================================= /** * Send a prompt to the agent. * Returns immediately after sending; use onEvent() to receive streaming events. * Use waitForIdle() to wait for completion. */ async prompt(message: string, images?: ImageContent[]): Promise { await this.send({ type: "prompt", message, images }); } /** * Queue a steering message to interrupt the agent mid-run. */ async steer(message: string): Promise { await this.send({ type: "steer", message }); } /** * Queue a follow-up message to be processed after the agent finishes. */ async followUp(message: string): Promise { await this.send({ type: "follow_up", message }); } /** * Abort current operation. */ async abort(): Promise { await this.send({ type: "abort" }); } /** * Start a new session, optionally with parent tracking. * @param parentSession - Optional parent session path for lineage tracking * @returns Object with `cancelled: true` if an extension cancelled the new session */ async newSession(parentSession?: string): Promise<{ cancelled: boolean }> { const response = await this.send({ type: "new_session", parentSession }); return this.getData(response); } /** * Get current session state. */ async getState(): Promise { const response = await this.send({ type: "get_state" }); return this.getData(response); } /** * Set model by provider and ID. */ async setModel(provider: string, modelId: string): Promise<{ provider: string; id: string }> { const response = await this.send({ type: "set_model", provider, modelId }); return this.getData(response); } /** * Cycle to next model. */ async cycleModel(): Promise<{ model: { provider: string; id: string }; thinkingLevel: ThinkingLevel; isScoped: boolean; } | null> { const response = await this.send({ type: "cycle_model" }); return this.getData(response); } /** * Get list of available models. */ async getAvailableModels(): Promise { const response = await this.send({ type: "get_available_models" }); return this.getData<{ models: ModelInfo[] }>(response).models; } /** * Set thinking level. */ async setThinkingLevel(level: ThinkingLevel): Promise { await this.send({ type: "set_thinking_level", level }); } /** * Cycle thinking level. */ async cycleThinkingLevel(): Promise<{ level: ThinkingLevel } | null> { const response = await this.send({ type: "cycle_thinking_level" }); return this.getData(response); } /** * Set steering mode. */ async setSteeringMode(mode: "all" | "one-at-a-time"): Promise { await this.send({ type: "set_steering_mode", mode }); } /** * Set follow-up mode. */ async setFollowUpMode(mode: "all" | "one-at-a-time"): Promise { await this.send({ type: "set_follow_up_mode", mode }); } /** * Compact session context. */ async compact(customInstructions?: string): Promise { const response = await this.send({ type: "compact", customInstructions }); return this.getData(response); } /** * Set auto-compaction enabled/disabled. */ async setAutoCompaction(enabled: boolean): Promise { await this.send({ type: "set_auto_compaction", enabled }); } /** * Set auto-retry enabled/disabled. */ async setAutoRetry(enabled: boolean): Promise { await this.send({ type: "set_auto_retry", enabled }); } /** * Abort in-progress retry. */ async abortRetry(): Promise { await this.send({ type: "abort_retry" }); } /** * Execute a bash command. */ async bash(command: string): Promise { const response = await this.send({ type: "bash", command }); return this.getData(response); } /** * Abort running bash command. */ async abortBash(): Promise { await this.send({ type: "abort_bash" }); } /** * Get session statistics. */ async getSessionStats(): Promise { const response = await this.send({ type: "get_session_stats" }); return this.getData(response); } /** * Export session to HTML. */ async exportHtml(outputPath?: string): Promise<{ path: string }> { const response = await this.send({ type: "export_html", outputPath }); return this.getData(response); } /** * Switch to a different session file. * @returns Object with `cancelled: true` if an extension cancelled the switch */ async switchSession(sessionPath: string): Promise<{ cancelled: boolean }> { const response = await this.send({ type: "switch_session", sessionPath }); return this.getData(response); } /** * Fork from a specific message. * @returns Object with `text` (the message text) and `cancelled` (if extension cancelled) */ async fork(entryId: string): Promise<{ text: string; cancelled: boolean }> { const response = await this.send({ type: "fork", entryId }); return this.getData(response); } /** * Get messages available for forking. */ async getForkMessages(): Promise> { const response = await this.send({ type: "get_fork_messages" }); return this.getData<{ messages: Array<{ entryId: string; text: string }> }>(response).messages; } /** * Get text of last assistant message. */ async getLastAssistantText(): Promise { const response = await this.send({ type: "get_last_assistant_text" }); return this.getData<{ text: string | null }>(response).text; } /** * Set the session display name. */ async setSessionName(name: string): Promise { await this.send({ type: "set_session_name", name }); } /** * Get all messages in the session. */ async getMessages(): Promise { const response = await this.send({ type: "get_messages" }); return this.getData<{ messages: AgentMessage[] }>(response).messages; } /** * Get available commands (extension commands, prompt templates, skills). */ async getCommands(): Promise { const response = await this.send({ type: "get_commands" }); return this.getData<{ commands: RpcSlashCommand[] }>(response).commands; } // ========================================================================= // Helpers // ========================================================================= /** * Wait for agent to become idle (no streaming). * Resolves when agent_end event is received. */ waitForIdle(timeout = 60000): Promise { return new Promise((resolve, reject) => { const timer = setTimeout(() => { unsubscribe(); reject(new Error(`Timeout waiting for agent to become idle. Stderr: ${this.stderr}`)); }, timeout); const unsubscribe = this.onEvent((event) => { if (event.type === "agent_end") { clearTimeout(timer); unsubscribe(); resolve(); } }); }); } /** * Collect events until agent becomes idle. */ collectEvents(timeout = 60000): Promise { return new Promise((resolve, reject) => { const events: AgentEvent[] = []; const timer = setTimeout(() => { unsubscribe(); reject(new Error(`Timeout collecting events. Stderr: ${this.stderr}`)); }, timeout); const unsubscribe = this.onEvent((event) => { events.push(event); if (event.type === "agent_end") { clearTimeout(timer); unsubscribe(); resolve(events); } }); }); } /** * Send prompt and wait for completion, returning all events. */ async promptAndWait(message: string, images?: ImageContent[], timeout = 60000): Promise { const eventsPromise = this.collectEvents(timeout); await this.prompt(message, images); return eventsPromise; } // ========================================================================= // Internal // ========================================================================= private handleLine(line: string): void { try { const data = JSON.parse(line); // Check if it's a response to a pending request if (data.type === "response" && data.id && this.pendingRequests.has(data.id)) { const pending = this.pendingRequests.get(data.id)!; this.pendingRequests.delete(data.id); pending.resolve(data as RpcResponse); return; } // Otherwise it's an event for (const listener of this.eventListeners) { listener(data as AgentEvent); } } catch { // Ignore non-JSON lines } } private async send(command: RpcCommandBody): Promise { if (!this.process?.stdin) { throw new Error("Client not started"); } const id = `req_${++this.requestId}`; const fullCommand = { ...command, id } as RpcCommand; return new Promise((resolve, reject) => { this.pendingRequests.set(id, { resolve, reject }); const timeout = setTimeout(() => { this.pendingRequests.delete(id); reject(new Error(`Timeout waiting for response to ${command.type}. Stderr: ${this.stderr}`)); }, 30000); this.pendingRequests.set(id, { resolve: (response) => { clearTimeout(timeout); resolve(response); }, reject: (error) => { clearTimeout(timeout); reject(error); }, }); this.process!.stdin!.write(`${JSON.stringify(fullCommand)}\n`); }); } private getData(response: RpcResponse): T { if (!response.success) { const errorResponse = response as Extract; throw new Error(errorResponse.error); } // Type assertion: we trust response.data matches T based on the command sent. // This is safe because each public method specifies the correct T for its command. const successResponse = response as Extract; return successResponse.data as T; } }