diff --git a/AGENTS.md b/AGENTS.md index 748ee7ab..5590f165 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -22,6 +22,8 @@ read README.md, then ask which module(s) to work on. Based on the answer, read t - NEVER commit unless user asks ## GitHub Issues +When reading issues: +- Always read all comments on the issue When creating issues: - Add `pkg:*` labels to indicate which package(s) the issue affects diff --git a/packages/coding-agent/src/modes/index.ts b/packages/coding-agent/src/modes/index.ts index 696076e0..67168d16 100644 --- a/packages/coding-agent/src/modes/index.ts +++ b/packages/coding-agent/src/modes/index.ts @@ -4,5 +4,6 @@ export { InteractiveMode } from "./interactive/interactive-mode.js"; export { runPrintMode } from "./print-mode.js"; -export { runRpcMode } from "./rpc-mode.js"; -// InteractiveMode will be added in WP15 +export { type ModelInfo, RpcClient, type RpcClientOptions, type RpcEventListener } from "./rpc/rpc-client.js"; +export { runRpcMode } from "./rpc/rpc-mode.js"; +export type { RpcCommand, RpcResponse, RpcSessionState } from "./rpc/rpc-types.js"; diff --git a/packages/coding-agent/src/modes/rpc-mode.ts b/packages/coding-agent/src/modes/rpc-mode.ts deleted file mode 100644 index 4986b903..00000000 --- a/packages/coding-agent/src/modes/rpc-mode.ts +++ /dev/null @@ -1,84 +0,0 @@ -/** - * RPC mode: Headless operation with JSON stdin/stdout protocol. - * - * Used for embedding the agent in other applications. - * Receives commands as JSON on stdin, outputs events as JSON on stdout. - */ - -import * as readline from "readline"; -import type { AgentSession } from "../core/agent-session.js"; - -/** - * Run in RPC mode. - * Listens for JSON commands on stdin, outputs events on stdout. - * - * Commands: - * - { type: "prompt", message: string, attachments?: Attachment[] } - * - { type: "abort" } - * - { type: "compact", customInstructions?: string } - * - { type: "bash", command: string } - * - * Events are output as JSON lines (same format as session manager). - */ -export async function runRpcMode(session: AgentSession): Promise { - // Output all agent events as JSON - session.subscribe((event) => { - console.log(JSON.stringify(event)); - }); - - // Listen for JSON input - const rl = readline.createInterface({ - input: process.stdin, - output: process.stdout, - terminal: false, - }); - - rl.on("line", async (line: string) => { - try { - const input = JSON.parse(line); - - switch (input.type) { - case "prompt": - if (input.message) { - await session.prompt(input.message, { - attachments: input.attachments, - expandSlashCommands: false, // RPC mode doesn't expand slash commands - }); - } - break; - - case "abort": - await session.abort(); - break; - - case "compact": - try { - const result = await session.compact(input.customInstructions); - console.log(JSON.stringify({ type: "compaction", ...result })); - } catch (error: any) { - console.log(JSON.stringify({ type: "error", error: `Compaction failed: ${error.message}` })); - } - break; - - case "bash": - if (input.command) { - try { - const result = await session.executeBash(input.command); - console.log(JSON.stringify({ type: "bash_end", ...result })); - } catch (error: any) { - console.log(JSON.stringify({ type: "error", error: `Bash failed: ${error.message}` })); - } - } - break; - - default: - console.log(JSON.stringify({ type: "error", error: `Unknown command: ${input.type}` })); - } - } catch (error: any) { - console.log(JSON.stringify({ type: "error", error: error.message })); - } - }); - - // Keep process alive forever - return new Promise(() => {}); -} diff --git a/packages/coding-agent/src/modes/rpc/rpc-client.ts b/packages/coding-agent/src/modes/rpc/rpc-client.ts new file mode 100644 index 00000000..b7c5991a --- /dev/null +++ b/packages/coding-agent/src/modes/rpc/rpc-client.ts @@ -0,0 +1,451 @@ +/** + * RPC Client for programmatic access to the coding agent. + * + * Spawns the agent in RPC mode and provides a typed API for all operations. + */ + +import { type ChildProcess, spawn } from "node:child_process"; +import * as readline from "node:readline"; +import type { AgentEvent, Attachment, ThinkingLevel } from "@mariozechner/pi-agent-core"; +import type { CompactionResult, SessionStats } from "../../core/agent-session.js"; +import type { BashResult } from "../../core/bash-executor.js"; +import type { RpcCommand, RpcResponse, RpcSessionState } from "./rpc-types.js"; + +// ============================================================================ +// Types +// ============================================================================ + +/** Distributive Omit that works with union types */ +type DistributiveOmit = T extends unknown ? Omit : never; + +/** RpcCommand without the id field (for internal send) */ +type RpcCommandBody = DistributiveOmit; + +export interface RpcClientOptions { + /** Path to the CLI entry point (default: searches for dist/cli.js) */ + cliPath?: string; + /** Working directory for the agent */ + cwd?: string; + /** Environment variables */ + env?: Record; + /** Provider to use */ + provider?: string; + /** Model ID to use */ + model?: string; + /** Additional CLI arguments */ + args?: string[]; +} + +export interface ModelInfo { + provider: string; + id: string; + contextWindow: number; + reasoning: boolean; +} + +export type RpcEventListener = (event: AgentEvent) => void; + +// ============================================================================ +// RPC Client +// ============================================================================ + +export class RpcClient { + private process: ChildProcess | null = null; + private rl: readline.Interface | null = null; + private eventListeners: RpcEventListener[] = []; + private pendingRequests: Map void; reject: (error: Error) => void }> = + new Map(); + private requestId = 0; + private stderr = ""; + + constructor(private options: RpcClientOptions = {}) {} + + /** + * Start the RPC agent process. + */ + async start(): Promise { + if (this.process) { + throw new Error("Client already started"); + } + + const cliPath = this.options.cliPath ?? "dist/cli.js"; + const args = ["--mode", "rpc"]; + + if (this.options.provider) { + args.push("--provider", this.options.provider); + } + if (this.options.model) { + args.push("--model", this.options.model); + } + if (this.options.args) { + args.push(...this.options.args); + } + + this.process = spawn("node", [cliPath, ...args], { + cwd: this.options.cwd, + env: { ...process.env, ...this.options.env }, + stdio: ["pipe", "pipe", "pipe"], + }); + + // Collect stderr for debugging + this.process.stderr?.on("data", (data) => { + this.stderr += data.toString(); + }); + + // Set up line reader for stdout + this.rl = readline.createInterface({ + input: this.process.stdout!, + terminal: false, + }); + + this.rl.on("line", (line) => { + this.handleLine(line); + }); + + // Wait a moment for process to initialize + await new Promise((resolve) => setTimeout(resolve, 100)); + + if (this.process.exitCode !== null) { + throw new Error(`Agent process exited immediately with code ${this.process.exitCode}. Stderr: ${this.stderr}`); + } + } + + /** + * Stop the RPC agent process. + */ + async stop(): Promise { + if (!this.process) return; + + this.rl?.close(); + this.process.kill("SIGTERM"); + + // Wait for process to exit + await new Promise((resolve) => { + const timeout = setTimeout(() => { + this.process?.kill("SIGKILL"); + resolve(); + }, 1000); + + this.process?.on("exit", () => { + clearTimeout(timeout); + resolve(); + }); + }); + + this.process = null; + this.rl = null; + this.pendingRequests.clear(); + } + + /** + * Subscribe to agent events. + */ + onEvent(listener: RpcEventListener): () => void { + this.eventListeners.push(listener); + return () => { + const index = this.eventListeners.indexOf(listener); + if (index !== -1) { + this.eventListeners.splice(index, 1); + } + }; + } + + /** + * Get collected stderr output (useful for debugging). + */ + getStderr(): string { + return this.stderr; + } + + // ========================================================================= + // Command Methods + // ========================================================================= + + /** + * Send a prompt to the agent. + * Returns immediately after sending; use onEvent() to receive streaming events. + * Use waitForIdle() to wait for completion. + */ + async prompt(message: string, attachments?: Attachment[]): Promise { + await this.send({ type: "prompt", message, attachments }); + } + + /** + * Queue a message while agent is streaming. + */ + async queueMessage(message: string): Promise { + await this.send({ type: "queue_message", message }); + } + + /** + * Abort current operation. + */ + async abort(): Promise { + await this.send({ type: "abort" }); + } + + /** + * Reset session (clear all messages). + */ + async reset(): Promise { + await this.send({ type: "reset" }); + } + + /** + * Get current session state. + */ + async getState(): Promise { + const response = await this.send({ type: "get_state" }); + return this.getData(response); + } + + /** + * Set model by provider and ID. + */ + async setModel(provider: string, modelId: string): Promise<{ provider: string; id: string }> { + const response = await this.send({ type: "set_model", provider, modelId }); + return this.getData(response); + } + + /** + * Cycle to next model. + */ + async cycleModel(): Promise<{ + model: { provider: string; id: string }; + thinkingLevel: ThinkingLevel; + isScoped: boolean; + } | null> { + const response = await this.send({ type: "cycle_model" }); + return this.getData(response); + } + + /** + * Get list of available models. + */ + async getAvailableModels(): Promise { + const response = await this.send({ type: "get_available_models" }); + return this.getData<{ models: ModelInfo[] }>(response).models; + } + + /** + * Set thinking level. + */ + async setThinkingLevel(level: ThinkingLevel): Promise { + await this.send({ type: "set_thinking_level", level }); + } + + /** + * Cycle thinking level. + */ + async cycleThinkingLevel(): Promise<{ level: ThinkingLevel } | null> { + const response = await this.send({ type: "cycle_thinking_level" }); + return this.getData(response); + } + + /** + * Set queue mode. + */ + async setQueueMode(mode: "all" | "one-at-a-time"): Promise { + await this.send({ type: "set_queue_mode", mode }); + } + + /** + * Compact session context. + */ + async compact(customInstructions?: string): Promise { + const response = await this.send({ type: "compact", customInstructions }); + return this.getData(response); + } + + /** + * Set auto-compaction enabled/disabled. + */ + async setAutoCompaction(enabled: boolean): Promise { + await this.send({ type: "set_auto_compaction", enabled }); + } + + /** + * Execute a bash command. + */ + async bash(command: string): Promise { + const response = await this.send({ type: "bash", command }); + return this.getData(response); + } + + /** + * Abort running bash command. + */ + async abortBash(): Promise { + await this.send({ type: "abort_bash" }); + } + + /** + * Get session statistics. + */ + async getSessionStats(): Promise { + const response = await this.send({ type: "get_session_stats" }); + return this.getData(response); + } + + /** + * Export session to HTML. + */ + async exportHtml(outputPath?: string): Promise<{ path: string }> { + const response = await this.send({ type: "export_html", outputPath }); + return this.getData(response); + } + + /** + * Switch to a different session file. + */ + async switchSession(sessionPath: string): Promise { + await this.send({ type: "switch_session", sessionPath }); + } + + /** + * Branch from a specific message. + */ + async branch(entryIndex: number): Promise<{ text: string }> { + const response = await this.send({ type: "branch", entryIndex }); + return this.getData(response); + } + + /** + * Get messages available for branching. + */ + async getBranchMessages(): Promise> { + const response = await this.send({ type: "get_branch_messages" }); + return this.getData<{ messages: Array<{ entryIndex: number; text: string }> }>(response).messages; + } + + /** + * Get text of last assistant message. + */ + async getLastAssistantText(): Promise { + const response = await this.send({ type: "get_last_assistant_text" }); + return this.getData<{ text: string | null }>(response).text; + } + + // ========================================================================= + // Helpers + // ========================================================================= + + /** + * Wait for agent to become idle (no streaming). + * Resolves when agent_end event is received. + */ + waitForIdle(timeout = 60000): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + unsubscribe(); + reject(new Error(`Timeout waiting for agent to become idle. Stderr: ${this.stderr}`)); + }, timeout); + + const unsubscribe = this.onEvent((event) => { + if (event.type === "agent_end") { + clearTimeout(timer); + unsubscribe(); + resolve(); + } + }); + }); + } + + /** + * Collect events until agent becomes idle. + */ + collectEvents(timeout = 60000): Promise { + return new Promise((resolve, reject) => { + const events: AgentEvent[] = []; + const timer = setTimeout(() => { + unsubscribe(); + reject(new Error(`Timeout collecting events. Stderr: ${this.stderr}`)); + }, timeout); + + const unsubscribe = this.onEvent((event) => { + events.push(event); + if (event.type === "agent_end") { + clearTimeout(timer); + unsubscribe(); + resolve(events); + } + }); + }); + } + + /** + * Send prompt and wait for completion, returning all events. + */ + async promptAndWait(message: string, attachments?: Attachment[], timeout = 60000): Promise { + const eventsPromise = this.collectEvents(timeout); + await this.prompt(message, attachments); + return eventsPromise; + } + + // ========================================================================= + // Internal + // ========================================================================= + + private handleLine(line: string): void { + try { + const data = JSON.parse(line); + + // Check if it's a response to a pending request + if (data.type === "response" && data.id && this.pendingRequests.has(data.id)) { + const pending = this.pendingRequests.get(data.id)!; + this.pendingRequests.delete(data.id); + pending.resolve(data as RpcResponse); + return; + } + + // Otherwise it's an event + for (const listener of this.eventListeners) { + listener(data as AgentEvent); + } + } catch { + // Ignore non-JSON lines + } + } + + private async send(command: RpcCommandBody): Promise { + if (!this.process?.stdin) { + throw new Error("Client not started"); + } + + const id = `req_${++this.requestId}`; + const fullCommand = { ...command, id } as RpcCommand; + + return new Promise((resolve, reject) => { + this.pendingRequests.set(id, { resolve, reject }); + + const timeout = setTimeout(() => { + this.pendingRequests.delete(id); + reject(new Error(`Timeout waiting for response to ${command.type}. Stderr: ${this.stderr}`)); + }, 30000); + + this.pendingRequests.set(id, { + resolve: (response) => { + clearTimeout(timeout); + resolve(response); + }, + reject: (error) => { + clearTimeout(timeout); + reject(error); + }, + }); + + this.process!.stdin!.write(JSON.stringify(fullCommand) + "\n"); + }); + } + + private getData(response: RpcResponse): T { + if (!response.success) { + const errorResponse = response as Extract; + throw new Error(errorResponse.error); + } + // Type assertion: we trust response.data matches T based on the command sent. + // This is safe because each public method specifies the correct T for its command. + const successResponse = response as Extract; + return successResponse.data as T; + } +} diff --git a/packages/coding-agent/src/modes/rpc/rpc-mode.ts b/packages/coding-agent/src/modes/rpc/rpc-mode.ts new file mode 100644 index 00000000..f85e4f43 --- /dev/null +++ b/packages/coding-agent/src/modes/rpc/rpc-mode.ts @@ -0,0 +1,256 @@ +/** + * RPC mode: Headless operation with JSON stdin/stdout protocol. + * + * Used for embedding the agent in other applications. + * Receives commands as JSON on stdin, outputs events and responses as JSON on stdout. + * + * Protocol: + * - Commands: JSON objects with `type` field, optional `id` for correlation + * - Responses: JSON objects with `type: "response"`, `command`, `success`, and optional `data`/`error` + * - Events: AgentSessionEvent objects streamed as they occur + */ + +import * as readline from "readline"; +import type { AgentSession } from "../../core/agent-session.js"; +import type { RpcCommand, RpcResponse, RpcSessionState } from "./rpc-types.js"; + +// Re-export types for consumers +export type { RpcCommand, RpcResponse, RpcSessionState } from "./rpc-types.js"; + +/** + * Run in RPC mode. + * Listens for JSON commands on stdin, outputs events and responses on stdout. + */ +export async function runRpcMode(session: AgentSession): Promise { + const output = (obj: RpcResponse | object) => { + console.log(JSON.stringify(obj)); + }; + + const success = ( + id: string | undefined, + command: T, + data?: object | null, + ): RpcResponse => { + if (data === undefined) { + return { id, type: "response", command, success: true } as RpcResponse; + } + return { id, type: "response", command, success: true, data } as RpcResponse; + }; + + const error = (id: string | undefined, command: string, message: string): RpcResponse => { + return { id, type: "response", command, success: false, error: message }; + }; + + // Output all agent events as JSON + session.subscribe((event) => { + output(event); + }); + + // Handle a single command + const handleCommand = async (command: RpcCommand): Promise => { + const id = command.id; + + switch (command.type) { + // ================================================================= + // Prompting + // ================================================================= + + case "prompt": { + // Don't await - events will stream + session + .prompt(command.message, { + attachments: command.attachments, + expandSlashCommands: false, + }) + .catch((e) => output(error(id, "prompt", e.message))); + return success(id, "prompt"); + } + + case "queue_message": { + await session.queueMessage(command.message); + return success(id, "queue_message"); + } + + case "abort": { + await session.abort(); + return success(id, "abort"); + } + + case "reset": { + await session.reset(); + return success(id, "reset"); + } + + // ================================================================= + // State + // ================================================================= + + case "get_state": { + const model = session.model; + const state: RpcSessionState = { + model: model ? { provider: model.provider, id: model.id, contextWindow: model.contextWindow } : null, + thinkingLevel: session.thinkingLevel, + isStreaming: session.isStreaming, + queueMode: session.queueMode, + sessionFile: session.sessionFile, + sessionId: session.sessionId, + autoCompactionEnabled: session.autoCompactionEnabled, + messageCount: session.messages.length, + queuedMessageCount: session.queuedMessageCount, + }; + return success(id, "get_state", state); + } + + // ================================================================= + // Model + // ================================================================= + + case "set_model": { + const models = await session.getAvailableModels(); + const model = models.find((m) => m.provider === command.provider && m.id === command.modelId); + if (!model) { + return error(id, "set_model", `Model not found: ${command.provider}/${command.modelId}`); + } + await session.setModel(model); + return success(id, "set_model", { provider: model.provider, id: model.id }); + } + + case "cycle_model": { + const result = await session.cycleModel(); + if (!result) { + return success(id, "cycle_model", null); + } + return success(id, "cycle_model", { + model: { provider: result.model.provider, id: result.model.id }, + thinkingLevel: result.thinkingLevel, + isScoped: result.isScoped, + }); + } + + case "get_available_models": { + const models = await session.getAvailableModels(); + return success(id, "get_available_models", { + models: models.map((m) => ({ + provider: m.provider, + id: m.id, + contextWindow: m.contextWindow, + reasoning: !!m.reasoning, + })), + }); + } + + // ================================================================= + // Thinking + // ================================================================= + + case "set_thinking_level": { + session.setThinkingLevel(command.level); + return success(id, "set_thinking_level"); + } + + case "cycle_thinking_level": { + const level = session.cycleThinkingLevel(); + if (!level) { + return success(id, "cycle_thinking_level", null); + } + return success(id, "cycle_thinking_level", { level }); + } + + // ================================================================= + // Queue Mode + // ================================================================= + + case "set_queue_mode": { + session.setQueueMode(command.mode); + return success(id, "set_queue_mode"); + } + + // ================================================================= + // Compaction + // ================================================================= + + case "compact": { + const result = await session.compact(command.customInstructions); + return success(id, "compact", result); + } + + case "set_auto_compaction": { + session.setAutoCompactionEnabled(command.enabled); + return success(id, "set_auto_compaction"); + } + + // ================================================================= + // Bash + // ================================================================= + + case "bash": { + const result = await session.executeBash(command.command); + return success(id, "bash", result); + } + + case "abort_bash": { + session.abortBash(); + return success(id, "abort_bash"); + } + + // ================================================================= + // Session + // ================================================================= + + case "get_session_stats": { + const stats = session.getSessionStats(); + return success(id, "get_session_stats", stats); + } + + case "export_html": { + const path = session.exportToHtml(command.outputPath); + return success(id, "export_html", { path }); + } + + case "switch_session": { + await session.switchSession(command.sessionPath); + return success(id, "switch_session"); + } + + case "branch": { + const text = session.branch(command.entryIndex); + return success(id, "branch", { text }); + } + + case "get_branch_messages": { + const messages = session.getUserMessagesForBranching(); + return success(id, "get_branch_messages", { messages }); + } + + case "get_last_assistant_text": { + const text = session.getLastAssistantText(); + return success(id, "get_last_assistant_text", { text }); + } + + default: { + const unknownCommand = command as { type: string }; + return error(undefined, unknownCommand.type, `Unknown command: ${unknownCommand.type}`); + } + } + }; + + // Listen for JSON input + const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, + terminal: false, + }); + + rl.on("line", async (line: string) => { + try { + const command = JSON.parse(line) as RpcCommand; + const response = await handleCommand(command); + output(response); + } catch (e: any) { + output(error(undefined, "parse", `Failed to parse command: ${e.message}`)); + } + }); + + // Keep process alive forever + return new Promise(() => {}); +} diff --git a/packages/coding-agent/src/modes/rpc/rpc-types.ts b/packages/coding-agent/src/modes/rpc/rpc-types.ts new file mode 100644 index 00000000..9c112b38 --- /dev/null +++ b/packages/coding-agent/src/modes/rpc/rpc-types.ts @@ -0,0 +1,156 @@ +/** + * RPC protocol types for headless operation. + * + * Commands are sent as JSON lines on stdin. + * Responses and events are emitted as JSON lines on stdout. + */ + +import type { Attachment, ThinkingLevel } from "@mariozechner/pi-agent-core"; +import type { CompactionResult, SessionStats } from "../../core/agent-session.js"; +import type { BashResult } from "../../core/bash-executor.js"; + +// ============================================================================ +// RPC Commands (stdin) +// ============================================================================ + +export type RpcCommand = + // Prompting + | { id?: string; type: "prompt"; message: string; attachments?: Attachment[] } + | { id?: string; type: "queue_message"; message: string } + | { id?: string; type: "abort" } + | { id?: string; type: "reset" } + + // State + | { id?: string; type: "get_state" } + + // Model + | { id?: string; type: "set_model"; provider: string; modelId: string } + | { id?: string; type: "cycle_model" } + | { id?: string; type: "get_available_models" } + + // Thinking + | { id?: string; type: "set_thinking_level"; level: ThinkingLevel } + | { id?: string; type: "cycle_thinking_level" } + + // Queue mode + | { id?: string; type: "set_queue_mode"; mode: "all" | "one-at-a-time" } + + // Compaction + | { id?: string; type: "compact"; customInstructions?: string } + | { id?: string; type: "set_auto_compaction"; enabled: boolean } + + // Bash + | { id?: string; type: "bash"; command: string } + | { id?: string; type: "abort_bash" } + + // Session + | { id?: string; type: "get_session_stats" } + | { id?: string; type: "export_html"; outputPath?: string } + | { id?: string; type: "switch_session"; sessionPath: string } + | { id?: string; type: "branch"; entryIndex: number } + | { id?: string; type: "get_branch_messages" } + | { id?: string; type: "get_last_assistant_text" }; + +// ============================================================================ +// RPC State +// ============================================================================ + +export interface RpcSessionState { + model: { provider: string; id: string; contextWindow: number } | null; + thinkingLevel: ThinkingLevel; + isStreaming: boolean; + queueMode: "all" | "one-at-a-time"; + sessionFile: string; + sessionId: string; + autoCompactionEnabled: boolean; + messageCount: number; + queuedMessageCount: number; +} + +// ============================================================================ +// RPC Responses (stdout) +// ============================================================================ + +// Success responses with data +export type RpcResponse = + // Prompting (async - events follow) + | { id?: string; type: "response"; command: "prompt"; success: true } + | { id?: string; type: "response"; command: "queue_message"; success: true } + | { id?: string; type: "response"; command: "abort"; success: true } + | { id?: string; type: "response"; command: "reset"; success: true } + + // State + | { id?: string; type: "response"; command: "get_state"; success: true; data: RpcSessionState } + + // Model + | { + id?: string; + type: "response"; + command: "set_model"; + success: true; + data: { provider: string; id: string }; + } + | { + id?: string; + type: "response"; + command: "cycle_model"; + success: true; + data: { model: { provider: string; id: string }; thinkingLevel: ThinkingLevel; isScoped: boolean } | null; + } + | { + id?: string; + type: "response"; + command: "get_available_models"; + success: true; + data: { models: Array<{ provider: string; id: string; contextWindow: number; reasoning: boolean }> }; + } + + // Thinking + | { id?: string; type: "response"; command: "set_thinking_level"; success: true } + | { + id?: string; + type: "response"; + command: "cycle_thinking_level"; + success: true; + data: { level: ThinkingLevel } | null; + } + + // Queue mode + | { id?: string; type: "response"; command: "set_queue_mode"; success: true } + + // Compaction + | { id?: string; type: "response"; command: "compact"; success: true; data: CompactionResult } + | { id?: string; type: "response"; command: "set_auto_compaction"; success: true } + + // Bash + | { id?: string; type: "response"; command: "bash"; success: true; data: BashResult } + | { id?: string; type: "response"; command: "abort_bash"; success: true } + + // Session + | { id?: string; type: "response"; command: "get_session_stats"; success: true; data: SessionStats } + | { id?: string; type: "response"; command: "export_html"; success: true; data: { path: string } } + | { id?: string; type: "response"; command: "switch_session"; success: true } + | { id?: string; type: "response"; command: "branch"; success: true; data: { text: string } } + | { + id?: string; + type: "response"; + command: "get_branch_messages"; + success: true; + data: { messages: Array<{ entryIndex: number; text: string }> }; + } + | { + id?: string; + type: "response"; + command: "get_last_assistant_text"; + success: true; + data: { text: string | null }; + } + + // Error response (any command can fail) + | { id?: string; type: "response"; command: string; success: false; error: string }; + +// ============================================================================ +// Helper type for extracting command types +// ============================================================================ + +export type RpcCommandType = RpcCommand["type"]; diff --git a/packages/coding-agent/test/rpc.test.ts b/packages/coding-agent/test/rpc.test.ts index d585a57a..7d13774f 100644 --- a/packages/coding-agent/test/rpc.test.ts +++ b/packages/coding-agent/test/rpc.test.ts @@ -1,130 +1,78 @@ -import { type ChildProcess, spawn } from "node:child_process"; import { existsSync, readdirSync, readFileSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { dirname, join } from "node:path"; -import * as readline from "node:readline"; import { fileURLToPath } from "node:url"; import type { AgentEvent } from "@mariozechner/pi-agent-core"; import { afterEach, beforeEach, describe, expect, test } from "vitest"; -import type { BashExecutionMessage } from "../src/core/messages.js"; -import type { CompactionEntry } from "../src/core/session-manager.js"; +import { RpcClient } from "../src/modes/rpc/rpc-client.js"; const __dirname = dirname(fileURLToPath(import.meta.url)); /** * RPC mode tests. - * Regression test for issue #83: https://github.com/badlogic/pi-mono/issues/83 */ describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_TOKEN)("RPC mode", () => { - let agent: ChildProcess; + let client: RpcClient; let sessionDir: string; beforeEach(() => { - // Create a unique temp directory for sessions sessionDir = join(tmpdir(), `pi-rpc-test-${Date.now()}`); + client = new RpcClient({ + cliPath: join(__dirname, "..", "dist", "cli.js"), + cwd: join(__dirname, ".."), + env: { PI_CODING_AGENT_DIR: sessionDir }, + provider: "anthropic", + model: "claude-sonnet-4-5", + }); }); - afterEach(() => { - // Kill the agent if still running - if (agent && !agent.killed) { - agent.kill("SIGKILL"); - } - // Clean up session directory + afterEach(async () => { + await client.stop(); if (sessionDir && existsSync(sessionDir)) { rmSync(sessionDir, { recursive: true }); } }); + test("should get state", async () => { + await client.start(); + const state = await client.getState(); + + expect(state.model).toBeDefined(); + expect(state.model?.provider).toBe("anthropic"); + expect(state.model?.id).toBe("claude-sonnet-4-5"); + expect(state.isStreaming).toBe(false); + expect(state.messageCount).toBe(0); + }, 30000); + test("should save messages to session file", async () => { - // Spawn agent in RPC mode with custom session directory - agent = spawn( - "node", - ["dist/cli.js", "--mode", "rpc", "--provider", "anthropic", "--model", "claude-sonnet-4-5"], - { - cwd: join(__dirname, ".."), - env: { - ...process.env, - PI_CODING_AGENT_DIR: sessionDir, - }, - }, - ); + await client.start(); - const events: AgentEvent[] = []; + // Send prompt and wait for completion + const events = await client.promptAndWait("Reply with just the word 'hello'"); - // Parse agent events - const rl = readline.createInterface({ input: agent.stdout!, terminal: false }); - - // Collect stderr for debugging - let stderr = ""; - agent.stderr?.on("data", (data) => { - stderr += data.toString(); - }); - - // Wait for agent_end which signals the full prompt/response cycle is complete - const waitForAgentEnd = new Promise((resolve, reject) => { - const timeout = setTimeout(() => reject(new Error("Timeout waiting for agent_end")), 60000); - - rl.on("line", (line: string) => { - try { - const event = JSON.parse(line) as AgentEvent; - events.push(event); - - // agent_end means the full prompt cycle completed (user msg + assistant response) - if (event.type === "agent_end") { - clearTimeout(timeout); - resolve(); - } - } catch { - // Ignore non-JSON lines - } - }); - - rl.on("close", () => { - clearTimeout(timeout); - reject(new Error("Agent stdout closed before agent_end")); - }); - }); - - // Send a simple prompt - the LLM will respond - agent.stdin!.write(JSON.stringify({ type: "prompt", message: "Reply with just the word 'hello'" }) + "\n"); - - // Wait for full prompt/response cycle to complete - await waitForAgentEnd; - - // Check that message_end events were emitted + // Should have message events const messageEndEvents = events.filter((e) => e.type === "message_end"); expect(messageEndEvents.length).toBeGreaterThanOrEqual(2); // user + assistant - // Wait a bit for file writes to complete + // Wait for file writes await new Promise((resolve) => setTimeout(resolve, 200)); - // Kill the agent gracefully - agent.kill("SIGTERM"); - - // Find and verify the session file + // Verify session file const sessionsPath = join(sessionDir, "sessions"); - expect(existsSync(sessionsPath), `Sessions path should exist: ${sessionsPath}. Stderr: ${stderr}`).toBe(true); + expect(existsSync(sessionsPath)).toBe(true); - // Find the session directory (it's based on cwd) const sessionDirs = readdirSync(sessionsPath); - expect(sessionDirs.length, `Should have at least one session dir. Stderr: ${stderr}`).toBeGreaterThan(0); + expect(sessionDirs.length).toBeGreaterThan(0); const cwdSessionDir = join(sessionsPath, sessionDirs[0]); - const allFiles = readdirSync(cwdSessionDir); - const sessionFiles = allFiles.filter((f) => f.endsWith(".jsonl")); - expect( - sessionFiles.length, - `Should have exactly one session file. Dir: ${cwdSessionDir}, Files: ${JSON.stringify(allFiles)}, Stderr: ${stderr}`, - ).toBe(1); + const sessionFiles = readdirSync(cwdSessionDir).filter((f) => f.endsWith(".jsonl")); + expect(sessionFiles.length).toBe(1); - // Read and verify session content const sessionContent = readFileSync(join(cwdSessionDir, sessionFiles[0]), "utf8"); - const lines = sessionContent.trim().split("\n"); - - // Should have session header and at least 2 messages (user + assistant) - expect(lines.length).toBeGreaterThanOrEqual(3); - - const entries = lines.map((line) => JSON.parse(line)); + const entries = sessionContent + .trim() + .split("\n") + .map((line) => JSON.parse(line)); // First entry should be session header expect(entries[0].type).toBe("session"); @@ -139,83 +87,20 @@ describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_T }, 90000); test("should handle manual compaction", async () => { - // Spawn agent in RPC mode - agent = spawn( - "node", - ["dist/cli.js", "--mode", "rpc", "--provider", "anthropic", "--model", "claude-sonnet-4-5"], - { - cwd: join(__dirname, ".."), - env: { - ...process.env, - PI_CODING_AGENT_DIR: sessionDir, - }, - }, - ); + await client.start(); - const events: (AgentEvent | CompactionEntry | { type: "error"; error: string })[] = []; + // First send a prompt to have messages to compact + await client.promptAndWait("Say hello"); - const rl = readline.createInterface({ input: agent.stdout!, terminal: false }); - - let stderr = ""; - agent.stderr?.on("data", (data) => { - stderr += data.toString(); - }); - - // Helper to wait for a specific event type - const waitForEvent = (eventType: string, timeout = 60000) => - new Promise((resolve, reject) => { - const timer = setTimeout(() => reject(new Error(`Timeout waiting for ${eventType}`)), timeout); - - const checkExisting = () => { - if (events.some((e) => e.type === eventType)) { - clearTimeout(timer); - resolve(); - return true; - } - return false; - }; - - if (checkExisting()) return; - - const handler = (line: string) => { - try { - const event = JSON.parse(line); - events.push(event); - if (event.type === eventType) { - clearTimeout(timer); - rl.off("line", handler); - resolve(); - } - } catch { - // Ignore non-JSON - } - }; - rl.on("line", handler); - }); - - // First, send a prompt to have some messages to compact - agent.stdin!.write(JSON.stringify({ type: "prompt", message: "Say hello" }) + "\n"); - await waitForEvent("agent_end"); - - // Clear events to focus on compaction - events.length = 0; - - // Send compact command - agent.stdin!.write(JSON.stringify({ type: "compact" }) + "\n"); - await waitForEvent("compaction"); - - // Verify compaction event - const compactionEvent = events.find((e) => e.type === "compaction") as CompactionEntry | undefined; - expect(compactionEvent).toBeDefined(); - expect(compactionEvent!.summary).toBeDefined(); - expect(compactionEvent!.tokensBefore).toBeGreaterThan(0); + // Compact + const result = await client.compact(); + expect(result.summary).toBeDefined(); + expect(result.tokensBefore).toBeGreaterThan(0); // Wait for file writes await new Promise((resolve) => setTimeout(resolve, 200)); - agent.kill("SIGTERM"); - - // Verify compaction was saved to session file + // Verify compaction in session file const sessionsPath = join(sessionDir, "sessions"); const sessionDirs = readdirSync(sessionsPath); const cwdSessionDir = join(sessionsPath, sessionDirs[0]); @@ -226,97 +111,34 @@ describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_T .split("\n") .map((line) => JSON.parse(line)); - // Should have a compaction entry const compactionEntries = entries.filter((e: { type: string }) => e.type === "compaction"); expect(compactionEntries.length).toBe(1); expect(compactionEntries[0].summary).toBeDefined(); }, 120000); - test("should execute bash command and add to context", async () => { - // Spawn agent in RPC mode - agent = spawn( - "node", - ["dist/cli.js", "--mode", "rpc", "--provider", "anthropic", "--model", "claude-sonnet-4-5"], - { - cwd: join(__dirname, ".."), - env: { - ...process.env, - PI_CODING_AGENT_DIR: sessionDir, - }, - }, - ); + test("should execute bash command", async () => { + await client.start(); - const events: ( - | AgentEvent - | { type: "bash_end"; message: BashExecutionMessage } - | { type: "error"; error: string } - )[] = []; + const result = await client.bash("echo hello"); + expect(result.output.trim()).toBe("hello"); + expect(result.exitCode).toBe(0); + expect(result.cancelled).toBe(false); + }, 30000); - const rl = readline.createInterface({ input: agent.stdout!, terminal: false }); + test("should add bash output to context", async () => { + await client.start(); - let stderr = ""; - agent.stderr?.on("data", (data) => { - stderr += data.toString(); - }); + // First send a prompt to initialize session + await client.promptAndWait("Say hi"); - // Set up persistent event collector BEFORE sending any commands - // This is critical for fast commands like bash that complete before - // a per-call handler would be registered - rl.on("line", (line: string) => { - try { - const event = JSON.parse(line); - events.push(event); - } catch { - // Ignore non-JSON - } - }); - - // Helper to wait for a specific event type by polling collected events - const waitForEvent = (eventType: string, timeout = 60000) => - new Promise((resolve, reject) => { - const timer = setTimeout( - () => reject(new Error(`Timeout waiting for ${eventType}. Stderr: ${stderr}`)), - timeout, - ); - const check = () => { - if (events.some((e) => e.type === eventType)) { - clearTimeout(timer); - resolve(); - } else { - setTimeout(check, 50); - } - }; - check(); - }); - - // Send a bash command - agent.stdin!.write(JSON.stringify({ type: "bash", command: "echo hello" }) + "\n"); - await waitForEvent("bash_end"); - - // Verify bash_end event - const bashEvent = events.find((e) => e.type === "bash_end") as - | { type: "bash_end"; message: BashExecutionMessage } - | undefined; - expect(bashEvent).toBeDefined(); - expect(bashEvent!.message.role).toBe("bashExecution"); - expect(bashEvent!.message.command).toBe("echo hello"); - expect(bashEvent!.message.output.trim()).toBe("hello"); - expect(bashEvent!.message.exitCode).toBe(0); - expect(bashEvent!.message.cancelled).toBe(false); - - // Clear events for next phase - events.length = 0; - - // Session only initializes after user+assistant exchange, so send a prompt - agent.stdin!.write(JSON.stringify({ type: "prompt", message: "Say hi" }) + "\n"); - await waitForEvent("agent_end"); + // Run bash command + const uniqueValue = `test-${Date.now()}`; + await client.bash(`echo ${uniqueValue}`); // Wait for file writes await new Promise((resolve) => setTimeout(resolve, 200)); - agent.kill("SIGTERM"); - - // Verify bash execution was saved to session file + // Verify bash message in session const sessionsPath = join(sessionDir, "sessions"); const sessionDirs = readdirSync(sessionsPath); const cwdSessionDir = join(sessionsPath, sessionDirs[0]); @@ -327,92 +149,27 @@ describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_T .split("\n") .map((line) => JSON.parse(line)); - // Should have a bashExecution message const bashMessages = entries.filter( (e: { type: string; message?: { role: string } }) => e.type === "message" && e.message?.role === "bashExecution", ); expect(bashMessages.length).toBe(1); - expect(bashMessages[0].message.command).toBe("echo hello"); - expect(bashMessages[0].message.output.trim()).toBe("hello"); + expect(bashMessages[0].message.output).toContain(uniqueValue); }, 90000); test("should include bash output in LLM context", async () => { - // Spawn agent in RPC mode - agent = spawn( - "node", - ["dist/cli.js", "--mode", "rpc", "--provider", "anthropic", "--model", "claude-sonnet-4-5"], - { - cwd: join(__dirname, ".."), - env: { - ...process.env, - PI_CODING_AGENT_DIR: sessionDir, - }, - }, + await client.start(); + + // Run a bash command with a unique value + const uniqueValue = `unique-${Date.now()}`; + await client.bash(`echo ${uniqueValue}`); + + // Ask the LLM what the output was + const events = await client.promptAndWait( + "What was the exact output of the echo command I just ran? Reply with just the value, nothing else.", ); - const events: ( - | AgentEvent - | { type: "bash_end"; message: BashExecutionMessage } - | { type: "error"; error: string } - )[] = []; - - const rl = readline.createInterface({ input: agent.stdout!, terminal: false }); - - let stderr = ""; - agent.stderr?.on("data", (data) => { - stderr += data.toString(); - }); - - // Set up persistent event collector BEFORE sending any commands - rl.on("line", (line: string) => { - try { - const event = JSON.parse(line); - events.push(event); - } catch { - // Ignore non-JSON - } - }); - - // Helper to wait for a specific event type by polling collected events - const waitForEvent = (eventType: string, timeout = 60000) => - new Promise((resolve, reject) => { - const timer = setTimeout( - () => reject(new Error(`Timeout waiting for ${eventType}. Stderr: ${stderr}`)), - timeout, - ); - const check = () => { - if (events.some((e) => e.type === eventType)) { - clearTimeout(timer); - resolve(); - } else { - setTimeout(check, 50); - } - }; - check(); - }); - - // Wait for agent to initialize (session manager, etc.) - await new Promise((resolve) => setTimeout(resolve, 500)); - - // First, run a bash command with a unique value - const uniqueValue = `test-${Date.now()}`; - agent.stdin!.write(JSON.stringify({ type: "bash", command: `echo ${uniqueValue}` }) + "\n"); - await waitForEvent("bash_end"); - - // Clear events but keep collecting new ones - events.length = 0; - - // Now ask the LLM what the output was - it should be in context - agent.stdin!.write( - JSON.stringify({ - type: "prompt", - message: `What was the exact output of the echo command I just ran? Reply with just the value, nothing else.`, - }) + "\n", - ); - await waitForEvent("agent_end"); - - // Find the assistant's response + // Find assistant's response const messageEndEvents = events.filter((e) => e.type === "message_end") as AgentEvent[]; const assistantMessage = messageEndEvents.find( (e) => e.type === "message_end" && (e as any).message?.role === "assistant", @@ -420,10 +177,109 @@ describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_T expect(assistantMessage).toBeDefined(); - // The assistant should mention the unique value from the bash output const textContent = assistantMessage.message.content.find((c: any) => c.type === "text"); expect(textContent?.text).toContain(uniqueValue); + }, 90000); - agent.kill("SIGTERM"); + test("should set and get thinking level", async () => { + await client.start(); + + // Set thinking level + await client.setThinkingLevel("high"); + + // Verify via state + const state = await client.getState(); + expect(state.thinkingLevel).toBe("high"); + }, 30000); + + test("should cycle thinking level", async () => { + await client.start(); + + // Get initial level + const initialState = await client.getState(); + const initialLevel = initialState.thinkingLevel; + + // Cycle + const result = await client.cycleThinkingLevel(); + expect(result).toBeDefined(); + expect(result!.level).not.toBe(initialLevel); + + // Verify via state + const newState = await client.getState(); + expect(newState.thinkingLevel).toBe(result!.level); + }, 30000); + + test("should get available models", async () => { + await client.start(); + + const models = await client.getAvailableModels(); + expect(models.length).toBeGreaterThan(0); + + // All models should have required fields + for (const model of models) { + expect(model.provider).toBeDefined(); + expect(model.id).toBeDefined(); + expect(model.contextWindow).toBeGreaterThan(0); + expect(typeof model.reasoning).toBe("boolean"); + } + }, 30000); + + test("should get session stats", async () => { + await client.start(); + + // Send a prompt first + await client.promptAndWait("Hello"); + + const stats = await client.getSessionStats(); + expect(stats.sessionFile).toBeDefined(); + expect(stats.sessionId).toBeDefined(); + expect(stats.userMessages).toBeGreaterThanOrEqual(1); + expect(stats.assistantMessages).toBeGreaterThanOrEqual(1); + }, 90000); + + test("should reset session", async () => { + await client.start(); + + // Send a prompt + await client.promptAndWait("Hello"); + + // Verify messages exist + let state = await client.getState(); + expect(state.messageCount).toBeGreaterThan(0); + + // Reset + await client.reset(); + + // Verify messages cleared + state = await client.getState(); + expect(state.messageCount).toBe(0); + }, 90000); + + test("should export to HTML", async () => { + await client.start(); + + // Send a prompt first + await client.promptAndWait("Hello"); + + // Export + const result = await client.exportHtml(); + expect(result.path).toBeDefined(); + expect(result.path.endsWith(".html")).toBe(true); + expect(existsSync(result.path)).toBe(true); + }, 90000); + + test("should get last assistant text", async () => { + await client.start(); + + // Initially null + let text = await client.getLastAssistantText(); + expect(text).toBeNull(); + + // Send prompt + await client.promptAndWait("Reply with just: test123"); + + // Should have text now + text = await client.getLastAssistantText(); + expect(text).toContain("test123"); }, 90000); });