Rewrite RPC mode with typed protocol and client

- Move RPC files to modes/rpc/ directory
- Add properly typed RpcCommand and RpcResponse types
- Expose full AgentSession API via RPC commands:
  - State: get_state
  - Model: set_model, cycle_model, get_available_models
  - Thinking: set_thinking_level, cycle_thinking_level
  - Queue: set_queue_mode
  - Compaction: compact, set_auto_compaction
  - Bash: bash, abort_bash
  - Session: get_session_stats, export_html, switch_session, branch, etc.
- Add RpcClient class for programmatic access
- Rewrite tests to use RpcClient instead of raw process spawning
- All commands support optional correlation ID for request/response matching
This commit is contained in:
Mario Zechner 2025-12-09 14:13:28 +01:00
parent b2e1054e5e
commit 3559a43ba0
7 changed files with 1039 additions and 401 deletions

View file

@ -22,6 +22,8 @@ read README.md, then ask which module(s) to work on. Based on the answer, read t
- NEVER commit unless user asks
## GitHub Issues
When reading issues:
- Always read all comments on the issue
When creating issues:
- Add `pkg:*` labels to indicate which package(s) the issue affects

View file

@ -4,5 +4,6 @@
export { InteractiveMode } from "./interactive/interactive-mode.js";
export { runPrintMode } from "./print-mode.js";
export { runRpcMode } from "./rpc-mode.js";
// InteractiveMode will be added in WP15
export { type ModelInfo, RpcClient, type RpcClientOptions, type RpcEventListener } from "./rpc/rpc-client.js";
export { runRpcMode } from "./rpc/rpc-mode.js";
export type { RpcCommand, RpcResponse, RpcSessionState } from "./rpc/rpc-types.js";

View file

@ -1,84 +0,0 @@
/**
* RPC mode: Headless operation with JSON stdin/stdout protocol.
*
* Used for embedding the agent in other applications.
* Receives commands as JSON on stdin, outputs events as JSON on stdout.
*/
import * as readline from "readline";
import type { AgentSession } from "../core/agent-session.js";
/**
* Run in RPC mode.
* Listens for JSON commands on stdin, outputs events on stdout.
*
* Commands:
* - { type: "prompt", message: string, attachments?: Attachment[] }
* - { type: "abort" }
* - { type: "compact", customInstructions?: string }
* - { type: "bash", command: string }
*
* Events are output as JSON lines (same format as session manager).
*/
export async function runRpcMode(session: AgentSession): Promise<never> {
// Output all agent events as JSON
session.subscribe((event) => {
console.log(JSON.stringify(event));
});
// Listen for JSON input
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
rl.on("line", async (line: string) => {
try {
const input = JSON.parse(line);
switch (input.type) {
case "prompt":
if (input.message) {
await session.prompt(input.message, {
attachments: input.attachments,
expandSlashCommands: false, // RPC mode doesn't expand slash commands
});
}
break;
case "abort":
await session.abort();
break;
case "compact":
try {
const result = await session.compact(input.customInstructions);
console.log(JSON.stringify({ type: "compaction", ...result }));
} catch (error: any) {
console.log(JSON.stringify({ type: "error", error: `Compaction failed: ${error.message}` }));
}
break;
case "bash":
if (input.command) {
try {
const result = await session.executeBash(input.command);
console.log(JSON.stringify({ type: "bash_end", ...result }));
} catch (error: any) {
console.log(JSON.stringify({ type: "error", error: `Bash failed: ${error.message}` }));
}
}
break;
default:
console.log(JSON.stringify({ type: "error", error: `Unknown command: ${input.type}` }));
}
} catch (error: any) {
console.log(JSON.stringify({ type: "error", error: error.message }));
}
});
// Keep process alive forever
return new Promise(() => {});
}

View file

@ -0,0 +1,451 @@
/**
* RPC Client for programmatic access to the coding agent.
*
* Spawns the agent in RPC mode and provides a typed API for all operations.
*/
import { type ChildProcess, spawn } from "node:child_process";
import * as readline from "node:readline";
import type { AgentEvent, Attachment, ThinkingLevel } from "@mariozechner/pi-agent-core";
import type { CompactionResult, SessionStats } from "../../core/agent-session.js";
import type { BashResult } from "../../core/bash-executor.js";
import type { RpcCommand, RpcResponse, RpcSessionState } from "./rpc-types.js";
// ============================================================================
// Types
// ============================================================================
/** Distributive Omit that works with union types */
type DistributiveOmit<T, K extends keyof T> = T extends unknown ? Omit<T, K> : never;
/** RpcCommand without the id field (for internal send) */
type RpcCommandBody = DistributiveOmit<RpcCommand, "id">;
export interface RpcClientOptions {
/** Path to the CLI entry point (default: searches for dist/cli.js) */
cliPath?: string;
/** Working directory for the agent */
cwd?: string;
/** Environment variables */
env?: Record<string, string>;
/** Provider to use */
provider?: string;
/** Model ID to use */
model?: string;
/** Additional CLI arguments */
args?: string[];
}
export interface ModelInfo {
provider: string;
id: string;
contextWindow: number;
reasoning: boolean;
}
export type RpcEventListener = (event: AgentEvent) => void;
// ============================================================================
// RPC Client
// ============================================================================
export class RpcClient {
private process: ChildProcess | null = null;
private rl: readline.Interface | null = null;
private eventListeners: RpcEventListener[] = [];
private pendingRequests: Map<string, { resolve: (response: RpcResponse) => void; reject: (error: Error) => void }> =
new Map();
private requestId = 0;
private stderr = "";
constructor(private options: RpcClientOptions = {}) {}
/**
* Start the RPC agent process.
*/
async start(): Promise<void> {
if (this.process) {
throw new Error("Client already started");
}
const cliPath = this.options.cliPath ?? "dist/cli.js";
const args = ["--mode", "rpc"];
if (this.options.provider) {
args.push("--provider", this.options.provider);
}
if (this.options.model) {
args.push("--model", this.options.model);
}
if (this.options.args) {
args.push(...this.options.args);
}
this.process = spawn("node", [cliPath, ...args], {
cwd: this.options.cwd,
env: { ...process.env, ...this.options.env },
stdio: ["pipe", "pipe", "pipe"],
});
// Collect stderr for debugging
this.process.stderr?.on("data", (data) => {
this.stderr += data.toString();
});
// Set up line reader for stdout
this.rl = readline.createInterface({
input: this.process.stdout!,
terminal: false,
});
this.rl.on("line", (line) => {
this.handleLine(line);
});
// Wait a moment for process to initialize
await new Promise((resolve) => setTimeout(resolve, 100));
if (this.process.exitCode !== null) {
throw new Error(`Agent process exited immediately with code ${this.process.exitCode}. Stderr: ${this.stderr}`);
}
}
/**
* Stop the RPC agent process.
*/
async stop(): Promise<void> {
if (!this.process) return;
this.rl?.close();
this.process.kill("SIGTERM");
// Wait for process to exit
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
this.process?.kill("SIGKILL");
resolve();
}, 1000);
this.process?.on("exit", () => {
clearTimeout(timeout);
resolve();
});
});
this.process = null;
this.rl = null;
this.pendingRequests.clear();
}
/**
* Subscribe to agent events.
*/
onEvent(listener: RpcEventListener): () => void {
this.eventListeners.push(listener);
return () => {
const index = this.eventListeners.indexOf(listener);
if (index !== -1) {
this.eventListeners.splice(index, 1);
}
};
}
/**
* Get collected stderr output (useful for debugging).
*/
getStderr(): string {
return this.stderr;
}
// =========================================================================
// Command Methods
// =========================================================================
/**
* Send a prompt to the agent.
* Returns immediately after sending; use onEvent() to receive streaming events.
* Use waitForIdle() to wait for completion.
*/
async prompt(message: string, attachments?: Attachment[]): Promise<void> {
await this.send({ type: "prompt", message, attachments });
}
/**
* Queue a message while agent is streaming.
*/
async queueMessage(message: string): Promise<void> {
await this.send({ type: "queue_message", message });
}
/**
* Abort current operation.
*/
async abort(): Promise<void> {
await this.send({ type: "abort" });
}
/**
* Reset session (clear all messages).
*/
async reset(): Promise<void> {
await this.send({ type: "reset" });
}
/**
* Get current session state.
*/
async getState(): Promise<RpcSessionState> {
const response = await this.send({ type: "get_state" });
return this.getData(response);
}
/**
* Set model by provider and ID.
*/
async setModel(provider: string, modelId: string): Promise<{ provider: string; id: string }> {
const response = await this.send({ type: "set_model", provider, modelId });
return this.getData(response);
}
/**
* Cycle to next model.
*/
async cycleModel(): Promise<{
model: { provider: string; id: string };
thinkingLevel: ThinkingLevel;
isScoped: boolean;
} | null> {
const response = await this.send({ type: "cycle_model" });
return this.getData(response);
}
/**
* Get list of available models.
*/
async getAvailableModels(): Promise<ModelInfo[]> {
const response = await this.send({ type: "get_available_models" });
return this.getData<{ models: ModelInfo[] }>(response).models;
}
/**
* Set thinking level.
*/
async setThinkingLevel(level: ThinkingLevel): Promise<void> {
await this.send({ type: "set_thinking_level", level });
}
/**
* Cycle thinking level.
*/
async cycleThinkingLevel(): Promise<{ level: ThinkingLevel } | null> {
const response = await this.send({ type: "cycle_thinking_level" });
return this.getData(response);
}
/**
* Set queue mode.
*/
async setQueueMode(mode: "all" | "one-at-a-time"): Promise<void> {
await this.send({ type: "set_queue_mode", mode });
}
/**
* Compact session context.
*/
async compact(customInstructions?: string): Promise<CompactionResult> {
const response = await this.send({ type: "compact", customInstructions });
return this.getData(response);
}
/**
* Set auto-compaction enabled/disabled.
*/
async setAutoCompaction(enabled: boolean): Promise<void> {
await this.send({ type: "set_auto_compaction", enabled });
}
/**
* Execute a bash command.
*/
async bash(command: string): Promise<BashResult> {
const response = await this.send({ type: "bash", command });
return this.getData(response);
}
/**
* Abort running bash command.
*/
async abortBash(): Promise<void> {
await this.send({ type: "abort_bash" });
}
/**
* Get session statistics.
*/
async getSessionStats(): Promise<SessionStats> {
const response = await this.send({ type: "get_session_stats" });
return this.getData(response);
}
/**
* Export session to HTML.
*/
async exportHtml(outputPath?: string): Promise<{ path: string }> {
const response = await this.send({ type: "export_html", outputPath });
return this.getData(response);
}
/**
* Switch to a different session file.
*/
async switchSession(sessionPath: string): Promise<void> {
await this.send({ type: "switch_session", sessionPath });
}
/**
* Branch from a specific message.
*/
async branch(entryIndex: number): Promise<{ text: string }> {
const response = await this.send({ type: "branch", entryIndex });
return this.getData(response);
}
/**
* Get messages available for branching.
*/
async getBranchMessages(): Promise<Array<{ entryIndex: number; text: string }>> {
const response = await this.send({ type: "get_branch_messages" });
return this.getData<{ messages: Array<{ entryIndex: number; text: string }> }>(response).messages;
}
/**
* Get text of last assistant message.
*/
async getLastAssistantText(): Promise<string | null> {
const response = await this.send({ type: "get_last_assistant_text" });
return this.getData<{ text: string | null }>(response).text;
}
// =========================================================================
// Helpers
// =========================================================================
/**
* Wait for agent to become idle (no streaming).
* Resolves when agent_end event is received.
*/
waitForIdle(timeout = 60000): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
unsubscribe();
reject(new Error(`Timeout waiting for agent to become idle. Stderr: ${this.stderr}`));
}, timeout);
const unsubscribe = this.onEvent((event) => {
if (event.type === "agent_end") {
clearTimeout(timer);
unsubscribe();
resolve();
}
});
});
}
/**
* Collect events until agent becomes idle.
*/
collectEvents(timeout = 60000): Promise<AgentEvent[]> {
return new Promise((resolve, reject) => {
const events: AgentEvent[] = [];
const timer = setTimeout(() => {
unsubscribe();
reject(new Error(`Timeout collecting events. Stderr: ${this.stderr}`));
}, timeout);
const unsubscribe = this.onEvent((event) => {
events.push(event);
if (event.type === "agent_end") {
clearTimeout(timer);
unsubscribe();
resolve(events);
}
});
});
}
/**
* Send prompt and wait for completion, returning all events.
*/
async promptAndWait(message: string, attachments?: Attachment[], timeout = 60000): Promise<AgentEvent[]> {
const eventsPromise = this.collectEvents(timeout);
await this.prompt(message, attachments);
return eventsPromise;
}
// =========================================================================
// Internal
// =========================================================================
private handleLine(line: string): void {
try {
const data = JSON.parse(line);
// Check if it's a response to a pending request
if (data.type === "response" && data.id && this.pendingRequests.has(data.id)) {
const pending = this.pendingRequests.get(data.id)!;
this.pendingRequests.delete(data.id);
pending.resolve(data as RpcResponse);
return;
}
// Otherwise it's an event
for (const listener of this.eventListeners) {
listener(data as AgentEvent);
}
} catch {
// Ignore non-JSON lines
}
}
private async send(command: RpcCommandBody): Promise<RpcResponse> {
if (!this.process?.stdin) {
throw new Error("Client not started");
}
const id = `req_${++this.requestId}`;
const fullCommand = { ...command, id } as RpcCommand;
return new Promise((resolve, reject) => {
this.pendingRequests.set(id, { resolve, reject });
const timeout = setTimeout(() => {
this.pendingRequests.delete(id);
reject(new Error(`Timeout waiting for response to ${command.type}. Stderr: ${this.stderr}`));
}, 30000);
this.pendingRequests.set(id, {
resolve: (response) => {
clearTimeout(timeout);
resolve(response);
},
reject: (error) => {
clearTimeout(timeout);
reject(error);
},
});
this.process!.stdin!.write(JSON.stringify(fullCommand) + "\n");
});
}
private getData<T>(response: RpcResponse): T {
if (!response.success) {
const errorResponse = response as Extract<RpcResponse, { success: false }>;
throw new Error(errorResponse.error);
}
// Type assertion: we trust response.data matches T based on the command sent.
// This is safe because each public method specifies the correct T for its command.
const successResponse = response as Extract<RpcResponse, { success: true; data: unknown }>;
return successResponse.data as T;
}
}

View file

@ -0,0 +1,256 @@
/**
* RPC mode: Headless operation with JSON stdin/stdout protocol.
*
* Used for embedding the agent in other applications.
* Receives commands as JSON on stdin, outputs events and responses as JSON on stdout.
*
* Protocol:
* - Commands: JSON objects with `type` field, optional `id` for correlation
* - Responses: JSON objects with `type: "response"`, `command`, `success`, and optional `data`/`error`
* - Events: AgentSessionEvent objects streamed as they occur
*/
import * as readline from "readline";
import type { AgentSession } from "../../core/agent-session.js";
import type { RpcCommand, RpcResponse, RpcSessionState } from "./rpc-types.js";
// Re-export types for consumers
export type { RpcCommand, RpcResponse, RpcSessionState } from "./rpc-types.js";
/**
* Run in RPC mode.
* Listens for JSON commands on stdin, outputs events and responses on stdout.
*/
export async function runRpcMode(session: AgentSession): Promise<never> {
const output = (obj: RpcResponse | object) => {
console.log(JSON.stringify(obj));
};
const success = <T extends RpcCommand["type"]>(
id: string | undefined,
command: T,
data?: object | null,
): RpcResponse => {
if (data === undefined) {
return { id, type: "response", command, success: true } as RpcResponse;
}
return { id, type: "response", command, success: true, data } as RpcResponse;
};
const error = (id: string | undefined, command: string, message: string): RpcResponse => {
return { id, type: "response", command, success: false, error: message };
};
// Output all agent events as JSON
session.subscribe((event) => {
output(event);
});
// Handle a single command
const handleCommand = async (command: RpcCommand): Promise<RpcResponse> => {
const id = command.id;
switch (command.type) {
// =================================================================
// Prompting
// =================================================================
case "prompt": {
// Don't await - events will stream
session
.prompt(command.message, {
attachments: command.attachments,
expandSlashCommands: false,
})
.catch((e) => output(error(id, "prompt", e.message)));
return success(id, "prompt");
}
case "queue_message": {
await session.queueMessage(command.message);
return success(id, "queue_message");
}
case "abort": {
await session.abort();
return success(id, "abort");
}
case "reset": {
await session.reset();
return success(id, "reset");
}
// =================================================================
// State
// =================================================================
case "get_state": {
const model = session.model;
const state: RpcSessionState = {
model: model ? { provider: model.provider, id: model.id, contextWindow: model.contextWindow } : null,
thinkingLevel: session.thinkingLevel,
isStreaming: session.isStreaming,
queueMode: session.queueMode,
sessionFile: session.sessionFile,
sessionId: session.sessionId,
autoCompactionEnabled: session.autoCompactionEnabled,
messageCount: session.messages.length,
queuedMessageCount: session.queuedMessageCount,
};
return success(id, "get_state", state);
}
// =================================================================
// Model
// =================================================================
case "set_model": {
const models = await session.getAvailableModels();
const model = models.find((m) => m.provider === command.provider && m.id === command.modelId);
if (!model) {
return error(id, "set_model", `Model not found: ${command.provider}/${command.modelId}`);
}
await session.setModel(model);
return success(id, "set_model", { provider: model.provider, id: model.id });
}
case "cycle_model": {
const result = await session.cycleModel();
if (!result) {
return success(id, "cycle_model", null);
}
return success(id, "cycle_model", {
model: { provider: result.model.provider, id: result.model.id },
thinkingLevel: result.thinkingLevel,
isScoped: result.isScoped,
});
}
case "get_available_models": {
const models = await session.getAvailableModels();
return success(id, "get_available_models", {
models: models.map((m) => ({
provider: m.provider,
id: m.id,
contextWindow: m.contextWindow,
reasoning: !!m.reasoning,
})),
});
}
// =================================================================
// Thinking
// =================================================================
case "set_thinking_level": {
session.setThinkingLevel(command.level);
return success(id, "set_thinking_level");
}
case "cycle_thinking_level": {
const level = session.cycleThinkingLevel();
if (!level) {
return success(id, "cycle_thinking_level", null);
}
return success(id, "cycle_thinking_level", { level });
}
// =================================================================
// Queue Mode
// =================================================================
case "set_queue_mode": {
session.setQueueMode(command.mode);
return success(id, "set_queue_mode");
}
// =================================================================
// Compaction
// =================================================================
case "compact": {
const result = await session.compact(command.customInstructions);
return success(id, "compact", result);
}
case "set_auto_compaction": {
session.setAutoCompactionEnabled(command.enabled);
return success(id, "set_auto_compaction");
}
// =================================================================
// Bash
// =================================================================
case "bash": {
const result = await session.executeBash(command.command);
return success(id, "bash", result);
}
case "abort_bash": {
session.abortBash();
return success(id, "abort_bash");
}
// =================================================================
// Session
// =================================================================
case "get_session_stats": {
const stats = session.getSessionStats();
return success(id, "get_session_stats", stats);
}
case "export_html": {
const path = session.exportToHtml(command.outputPath);
return success(id, "export_html", { path });
}
case "switch_session": {
await session.switchSession(command.sessionPath);
return success(id, "switch_session");
}
case "branch": {
const text = session.branch(command.entryIndex);
return success(id, "branch", { text });
}
case "get_branch_messages": {
const messages = session.getUserMessagesForBranching();
return success(id, "get_branch_messages", { messages });
}
case "get_last_assistant_text": {
const text = session.getLastAssistantText();
return success(id, "get_last_assistant_text", { text });
}
default: {
const unknownCommand = command as { type: string };
return error(undefined, unknownCommand.type, `Unknown command: ${unknownCommand.type}`);
}
}
};
// Listen for JSON input
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
rl.on("line", async (line: string) => {
try {
const command = JSON.parse(line) as RpcCommand;
const response = await handleCommand(command);
output(response);
} catch (e: any) {
output(error(undefined, "parse", `Failed to parse command: ${e.message}`));
}
});
// Keep process alive forever
return new Promise(() => {});
}

View file

@ -0,0 +1,156 @@
/**
* RPC protocol types for headless operation.
*
* Commands are sent as JSON lines on stdin.
* Responses and events are emitted as JSON lines on stdout.
*/
import type { Attachment, ThinkingLevel } from "@mariozechner/pi-agent-core";
import type { CompactionResult, SessionStats } from "../../core/agent-session.js";
import type { BashResult } from "../../core/bash-executor.js";
// ============================================================================
// RPC Commands (stdin)
// ============================================================================
export type RpcCommand =
// Prompting
| { id?: string; type: "prompt"; message: string; attachments?: Attachment[] }
| { id?: string; type: "queue_message"; message: string }
| { id?: string; type: "abort" }
| { id?: string; type: "reset" }
// State
| { id?: string; type: "get_state" }
// Model
| { id?: string; type: "set_model"; provider: string; modelId: string }
| { id?: string; type: "cycle_model" }
| { id?: string; type: "get_available_models" }
// Thinking
| { id?: string; type: "set_thinking_level"; level: ThinkingLevel }
| { id?: string; type: "cycle_thinking_level" }
// Queue mode
| { id?: string; type: "set_queue_mode"; mode: "all" | "one-at-a-time" }
// Compaction
| { id?: string; type: "compact"; customInstructions?: string }
| { id?: string; type: "set_auto_compaction"; enabled: boolean }
// Bash
| { id?: string; type: "bash"; command: string }
| { id?: string; type: "abort_bash" }
// Session
| { id?: string; type: "get_session_stats" }
| { id?: string; type: "export_html"; outputPath?: string }
| { id?: string; type: "switch_session"; sessionPath: string }
| { id?: string; type: "branch"; entryIndex: number }
| { id?: string; type: "get_branch_messages" }
| { id?: string; type: "get_last_assistant_text" };
// ============================================================================
// RPC State
// ============================================================================
export interface RpcSessionState {
model: { provider: string; id: string; contextWindow: number } | null;
thinkingLevel: ThinkingLevel;
isStreaming: boolean;
queueMode: "all" | "one-at-a-time";
sessionFile: string;
sessionId: string;
autoCompactionEnabled: boolean;
messageCount: number;
queuedMessageCount: number;
}
// ============================================================================
// RPC Responses (stdout)
// ============================================================================
// Success responses with data
export type RpcResponse =
// Prompting (async - events follow)
| { id?: string; type: "response"; command: "prompt"; success: true }
| { id?: string; type: "response"; command: "queue_message"; success: true }
| { id?: string; type: "response"; command: "abort"; success: true }
| { id?: string; type: "response"; command: "reset"; success: true }
// State
| { id?: string; type: "response"; command: "get_state"; success: true; data: RpcSessionState }
// Model
| {
id?: string;
type: "response";
command: "set_model";
success: true;
data: { provider: string; id: string };
}
| {
id?: string;
type: "response";
command: "cycle_model";
success: true;
data: { model: { provider: string; id: string }; thinkingLevel: ThinkingLevel; isScoped: boolean } | null;
}
| {
id?: string;
type: "response";
command: "get_available_models";
success: true;
data: { models: Array<{ provider: string; id: string; contextWindow: number; reasoning: boolean }> };
}
// Thinking
| { id?: string; type: "response"; command: "set_thinking_level"; success: true }
| {
id?: string;
type: "response";
command: "cycle_thinking_level";
success: true;
data: { level: ThinkingLevel } | null;
}
// Queue mode
| { id?: string; type: "response"; command: "set_queue_mode"; success: true }
// Compaction
| { id?: string; type: "response"; command: "compact"; success: true; data: CompactionResult }
| { id?: string; type: "response"; command: "set_auto_compaction"; success: true }
// Bash
| { id?: string; type: "response"; command: "bash"; success: true; data: BashResult }
| { id?: string; type: "response"; command: "abort_bash"; success: true }
// Session
| { id?: string; type: "response"; command: "get_session_stats"; success: true; data: SessionStats }
| { id?: string; type: "response"; command: "export_html"; success: true; data: { path: string } }
| { id?: string; type: "response"; command: "switch_session"; success: true }
| { id?: string; type: "response"; command: "branch"; success: true; data: { text: string } }
| {
id?: string;
type: "response";
command: "get_branch_messages";
success: true;
data: { messages: Array<{ entryIndex: number; text: string }> };
}
| {
id?: string;
type: "response";
command: "get_last_assistant_text";
success: true;
data: { text: string | null };
}
// Error response (any command can fail)
| { id?: string; type: "response"; command: string; success: false; error: string };
// ============================================================================
// Helper type for extracting command types
// ============================================================================
export type RpcCommandType = RpcCommand["type"];

View file

@ -1,130 +1,78 @@
import { type ChildProcess, spawn } from "node:child_process";
import { existsSync, readdirSync, readFileSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import * as readline from "node:readline";
import { fileURLToPath } from "node:url";
import type { AgentEvent } from "@mariozechner/pi-agent-core";
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import type { BashExecutionMessage } from "../src/core/messages.js";
import type { CompactionEntry } from "../src/core/session-manager.js";
import { RpcClient } from "../src/modes/rpc/rpc-client.js";
const __dirname = dirname(fileURLToPath(import.meta.url));
/**
* RPC mode tests.
* Regression test for issue #83: https://github.com/badlogic/pi-mono/issues/83
*/
describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_TOKEN)("RPC mode", () => {
let agent: ChildProcess;
let client: RpcClient;
let sessionDir: string;
beforeEach(() => {
// Create a unique temp directory for sessions
sessionDir = join(tmpdir(), `pi-rpc-test-${Date.now()}`);
client = new RpcClient({
cliPath: join(__dirname, "..", "dist", "cli.js"),
cwd: join(__dirname, ".."),
env: { PI_CODING_AGENT_DIR: sessionDir },
provider: "anthropic",
model: "claude-sonnet-4-5",
});
});
afterEach(() => {
// Kill the agent if still running
if (agent && !agent.killed) {
agent.kill("SIGKILL");
}
// Clean up session directory
afterEach(async () => {
await client.stop();
if (sessionDir && existsSync(sessionDir)) {
rmSync(sessionDir, { recursive: true });
}
});
test("should get state", async () => {
await client.start();
const state = await client.getState();
expect(state.model).toBeDefined();
expect(state.model?.provider).toBe("anthropic");
expect(state.model?.id).toBe("claude-sonnet-4-5");
expect(state.isStreaming).toBe(false);
expect(state.messageCount).toBe(0);
}, 30000);
test("should save messages to session file", async () => {
// Spawn agent in RPC mode with custom session directory
agent = spawn(
"node",
["dist/cli.js", "--mode", "rpc", "--provider", "anthropic", "--model", "claude-sonnet-4-5"],
{
cwd: join(__dirname, ".."),
env: {
...process.env,
PI_CODING_AGENT_DIR: sessionDir,
},
},
);
await client.start();
const events: AgentEvent[] = [];
// Send prompt and wait for completion
const events = await client.promptAndWait("Reply with just the word 'hello'");
// Parse agent events
const rl = readline.createInterface({ input: agent.stdout!, terminal: false });
// Collect stderr for debugging
let stderr = "";
agent.stderr?.on("data", (data) => {
stderr += data.toString();
});
// Wait for agent_end which signals the full prompt/response cycle is complete
const waitForAgentEnd = new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => reject(new Error("Timeout waiting for agent_end")), 60000);
rl.on("line", (line: string) => {
try {
const event = JSON.parse(line) as AgentEvent;
events.push(event);
// agent_end means the full prompt cycle completed (user msg + assistant response)
if (event.type === "agent_end") {
clearTimeout(timeout);
resolve();
}
} catch {
// Ignore non-JSON lines
}
});
rl.on("close", () => {
clearTimeout(timeout);
reject(new Error("Agent stdout closed before agent_end"));
});
});
// Send a simple prompt - the LLM will respond
agent.stdin!.write(JSON.stringify({ type: "prompt", message: "Reply with just the word 'hello'" }) + "\n");
// Wait for full prompt/response cycle to complete
await waitForAgentEnd;
// Check that message_end events were emitted
// Should have message events
const messageEndEvents = events.filter((e) => e.type === "message_end");
expect(messageEndEvents.length).toBeGreaterThanOrEqual(2); // user + assistant
// Wait a bit for file writes to complete
// Wait for file writes
await new Promise((resolve) => setTimeout(resolve, 200));
// Kill the agent gracefully
agent.kill("SIGTERM");
// Find and verify the session file
// Verify session file
const sessionsPath = join(sessionDir, "sessions");
expect(existsSync(sessionsPath), `Sessions path should exist: ${sessionsPath}. Stderr: ${stderr}`).toBe(true);
expect(existsSync(sessionsPath)).toBe(true);
// Find the session directory (it's based on cwd)
const sessionDirs = readdirSync(sessionsPath);
expect(sessionDirs.length, `Should have at least one session dir. Stderr: ${stderr}`).toBeGreaterThan(0);
expect(sessionDirs.length).toBeGreaterThan(0);
const cwdSessionDir = join(sessionsPath, sessionDirs[0]);
const allFiles = readdirSync(cwdSessionDir);
const sessionFiles = allFiles.filter((f) => f.endsWith(".jsonl"));
expect(
sessionFiles.length,
`Should have exactly one session file. Dir: ${cwdSessionDir}, Files: ${JSON.stringify(allFiles)}, Stderr: ${stderr}`,
).toBe(1);
const sessionFiles = readdirSync(cwdSessionDir).filter((f) => f.endsWith(".jsonl"));
expect(sessionFiles.length).toBe(1);
// Read and verify session content
const sessionContent = readFileSync(join(cwdSessionDir, sessionFiles[0]), "utf8");
const lines = sessionContent.trim().split("\n");
// Should have session header and at least 2 messages (user + assistant)
expect(lines.length).toBeGreaterThanOrEqual(3);
const entries = lines.map((line) => JSON.parse(line));
const entries = sessionContent
.trim()
.split("\n")
.map((line) => JSON.parse(line));
// First entry should be session header
expect(entries[0].type).toBe("session");
@ -139,83 +87,20 @@ describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_T
}, 90000);
test("should handle manual compaction", async () => {
// Spawn agent in RPC mode
agent = spawn(
"node",
["dist/cli.js", "--mode", "rpc", "--provider", "anthropic", "--model", "claude-sonnet-4-5"],
{
cwd: join(__dirname, ".."),
env: {
...process.env,
PI_CODING_AGENT_DIR: sessionDir,
},
},
);
await client.start();
const events: (AgentEvent | CompactionEntry | { type: "error"; error: string })[] = [];
// First send a prompt to have messages to compact
await client.promptAndWait("Say hello");
const rl = readline.createInterface({ input: agent.stdout!, terminal: false });
let stderr = "";
agent.stderr?.on("data", (data) => {
stderr += data.toString();
});
// Helper to wait for a specific event type
const waitForEvent = (eventType: string, timeout = 60000) =>
new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => reject(new Error(`Timeout waiting for ${eventType}`)), timeout);
const checkExisting = () => {
if (events.some((e) => e.type === eventType)) {
clearTimeout(timer);
resolve();
return true;
}
return false;
};
if (checkExisting()) return;
const handler = (line: string) => {
try {
const event = JSON.parse(line);
events.push(event);
if (event.type === eventType) {
clearTimeout(timer);
rl.off("line", handler);
resolve();
}
} catch {
// Ignore non-JSON
}
};
rl.on("line", handler);
});
// First, send a prompt to have some messages to compact
agent.stdin!.write(JSON.stringify({ type: "prompt", message: "Say hello" }) + "\n");
await waitForEvent("agent_end");
// Clear events to focus on compaction
events.length = 0;
// Send compact command
agent.stdin!.write(JSON.stringify({ type: "compact" }) + "\n");
await waitForEvent("compaction");
// Verify compaction event
const compactionEvent = events.find((e) => e.type === "compaction") as CompactionEntry | undefined;
expect(compactionEvent).toBeDefined();
expect(compactionEvent!.summary).toBeDefined();
expect(compactionEvent!.tokensBefore).toBeGreaterThan(0);
// Compact
const result = await client.compact();
expect(result.summary).toBeDefined();
expect(result.tokensBefore).toBeGreaterThan(0);
// Wait for file writes
await new Promise((resolve) => setTimeout(resolve, 200));
agent.kill("SIGTERM");
// Verify compaction was saved to session file
// Verify compaction in session file
const sessionsPath = join(sessionDir, "sessions");
const sessionDirs = readdirSync(sessionsPath);
const cwdSessionDir = join(sessionsPath, sessionDirs[0]);
@ -226,97 +111,34 @@ describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_T
.split("\n")
.map((line) => JSON.parse(line));
// Should have a compaction entry
const compactionEntries = entries.filter((e: { type: string }) => e.type === "compaction");
expect(compactionEntries.length).toBe(1);
expect(compactionEntries[0].summary).toBeDefined();
}, 120000);
test("should execute bash command and add to context", async () => {
// Spawn agent in RPC mode
agent = spawn(
"node",
["dist/cli.js", "--mode", "rpc", "--provider", "anthropic", "--model", "claude-sonnet-4-5"],
{
cwd: join(__dirname, ".."),
env: {
...process.env,
PI_CODING_AGENT_DIR: sessionDir,
},
},
);
test("should execute bash command", async () => {
await client.start();
const events: (
| AgentEvent
| { type: "bash_end"; message: BashExecutionMessage }
| { type: "error"; error: string }
)[] = [];
const result = await client.bash("echo hello");
expect(result.output.trim()).toBe("hello");
expect(result.exitCode).toBe(0);
expect(result.cancelled).toBe(false);
}, 30000);
const rl = readline.createInterface({ input: agent.stdout!, terminal: false });
test("should add bash output to context", async () => {
await client.start();
let stderr = "";
agent.stderr?.on("data", (data) => {
stderr += data.toString();
});
// First send a prompt to initialize session
await client.promptAndWait("Say hi");
// Set up persistent event collector BEFORE sending any commands
// This is critical for fast commands like bash that complete before
// a per-call handler would be registered
rl.on("line", (line: string) => {
try {
const event = JSON.parse(line);
events.push(event);
} catch {
// Ignore non-JSON
}
});
// Helper to wait for a specific event type by polling collected events
const waitForEvent = (eventType: string, timeout = 60000) =>
new Promise<void>((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`Timeout waiting for ${eventType}. Stderr: ${stderr}`)),
timeout,
);
const check = () => {
if (events.some((e) => e.type === eventType)) {
clearTimeout(timer);
resolve();
} else {
setTimeout(check, 50);
}
};
check();
});
// Send a bash command
agent.stdin!.write(JSON.stringify({ type: "bash", command: "echo hello" }) + "\n");
await waitForEvent("bash_end");
// Verify bash_end event
const bashEvent = events.find((e) => e.type === "bash_end") as
| { type: "bash_end"; message: BashExecutionMessage }
| undefined;
expect(bashEvent).toBeDefined();
expect(bashEvent!.message.role).toBe("bashExecution");
expect(bashEvent!.message.command).toBe("echo hello");
expect(bashEvent!.message.output.trim()).toBe("hello");
expect(bashEvent!.message.exitCode).toBe(0);
expect(bashEvent!.message.cancelled).toBe(false);
// Clear events for next phase
events.length = 0;
// Session only initializes after user+assistant exchange, so send a prompt
agent.stdin!.write(JSON.stringify({ type: "prompt", message: "Say hi" }) + "\n");
await waitForEvent("agent_end");
// Run bash command
const uniqueValue = `test-${Date.now()}`;
await client.bash(`echo ${uniqueValue}`);
// Wait for file writes
await new Promise((resolve) => setTimeout(resolve, 200));
agent.kill("SIGTERM");
// Verify bash execution was saved to session file
// Verify bash message in session
const sessionsPath = join(sessionDir, "sessions");
const sessionDirs = readdirSync(sessionsPath);
const cwdSessionDir = join(sessionsPath, sessionDirs[0]);
@ -327,92 +149,27 @@ describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_T
.split("\n")
.map((line) => JSON.parse(line));
// Should have a bashExecution message
const bashMessages = entries.filter(
(e: { type: string; message?: { role: string } }) =>
e.type === "message" && e.message?.role === "bashExecution",
);
expect(bashMessages.length).toBe(1);
expect(bashMessages[0].message.command).toBe("echo hello");
expect(bashMessages[0].message.output.trim()).toBe("hello");
expect(bashMessages[0].message.output).toContain(uniqueValue);
}, 90000);
test("should include bash output in LLM context", async () => {
// Spawn agent in RPC mode
agent = spawn(
"node",
["dist/cli.js", "--mode", "rpc", "--provider", "anthropic", "--model", "claude-sonnet-4-5"],
{
cwd: join(__dirname, ".."),
env: {
...process.env,
PI_CODING_AGENT_DIR: sessionDir,
},
},
await client.start();
// Run a bash command with a unique value
const uniqueValue = `unique-${Date.now()}`;
await client.bash(`echo ${uniqueValue}`);
// Ask the LLM what the output was
const events = await client.promptAndWait(
"What was the exact output of the echo command I just ran? Reply with just the value, nothing else.",
);
const events: (
| AgentEvent
| { type: "bash_end"; message: BashExecutionMessage }
| { type: "error"; error: string }
)[] = [];
const rl = readline.createInterface({ input: agent.stdout!, terminal: false });
let stderr = "";
agent.stderr?.on("data", (data) => {
stderr += data.toString();
});
// Set up persistent event collector BEFORE sending any commands
rl.on("line", (line: string) => {
try {
const event = JSON.parse(line);
events.push(event);
} catch {
// Ignore non-JSON
}
});
// Helper to wait for a specific event type by polling collected events
const waitForEvent = (eventType: string, timeout = 60000) =>
new Promise<void>((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`Timeout waiting for ${eventType}. Stderr: ${stderr}`)),
timeout,
);
const check = () => {
if (events.some((e) => e.type === eventType)) {
clearTimeout(timer);
resolve();
} else {
setTimeout(check, 50);
}
};
check();
});
// Wait for agent to initialize (session manager, etc.)
await new Promise((resolve) => setTimeout(resolve, 500));
// First, run a bash command with a unique value
const uniqueValue = `test-${Date.now()}`;
agent.stdin!.write(JSON.stringify({ type: "bash", command: `echo ${uniqueValue}` }) + "\n");
await waitForEvent("bash_end");
// Clear events but keep collecting new ones
events.length = 0;
// Now ask the LLM what the output was - it should be in context
agent.stdin!.write(
JSON.stringify({
type: "prompt",
message: `What was the exact output of the echo command I just ran? Reply with just the value, nothing else.`,
}) + "\n",
);
await waitForEvent("agent_end");
// Find the assistant's response
// Find assistant's response
const messageEndEvents = events.filter((e) => e.type === "message_end") as AgentEvent[];
const assistantMessage = messageEndEvents.find(
(e) => e.type === "message_end" && (e as any).message?.role === "assistant",
@ -420,10 +177,109 @@ describe.skipIf(!process.env.ANTHROPIC_API_KEY && !process.env.ANTHROPIC_OAUTH_T
expect(assistantMessage).toBeDefined();
// The assistant should mention the unique value from the bash output
const textContent = assistantMessage.message.content.find((c: any) => c.type === "text");
expect(textContent?.text).toContain(uniqueValue);
}, 90000);
agent.kill("SIGTERM");
test("should set and get thinking level", async () => {
await client.start();
// Set thinking level
await client.setThinkingLevel("high");
// Verify via state
const state = await client.getState();
expect(state.thinkingLevel).toBe("high");
}, 30000);
test("should cycle thinking level", async () => {
await client.start();
// Get initial level
const initialState = await client.getState();
const initialLevel = initialState.thinkingLevel;
// Cycle
const result = await client.cycleThinkingLevel();
expect(result).toBeDefined();
expect(result!.level).not.toBe(initialLevel);
// Verify via state
const newState = await client.getState();
expect(newState.thinkingLevel).toBe(result!.level);
}, 30000);
test("should get available models", async () => {
await client.start();
const models = await client.getAvailableModels();
expect(models.length).toBeGreaterThan(0);
// All models should have required fields
for (const model of models) {
expect(model.provider).toBeDefined();
expect(model.id).toBeDefined();
expect(model.contextWindow).toBeGreaterThan(0);
expect(typeof model.reasoning).toBe("boolean");
}
}, 30000);
test("should get session stats", async () => {
await client.start();
// Send a prompt first
await client.promptAndWait("Hello");
const stats = await client.getSessionStats();
expect(stats.sessionFile).toBeDefined();
expect(stats.sessionId).toBeDefined();
expect(stats.userMessages).toBeGreaterThanOrEqual(1);
expect(stats.assistantMessages).toBeGreaterThanOrEqual(1);
}, 90000);
test("should reset session", async () => {
await client.start();
// Send a prompt
await client.promptAndWait("Hello");
// Verify messages exist
let state = await client.getState();
expect(state.messageCount).toBeGreaterThan(0);
// Reset
await client.reset();
// Verify messages cleared
state = await client.getState();
expect(state.messageCount).toBe(0);
}, 90000);
test("should export to HTML", async () => {
await client.start();
// Send a prompt first
await client.promptAndWait("Hello");
// Export
const result = await client.exportHtml();
expect(result.path).toBeDefined();
expect(result.path.endsWith(".html")).toBe(true);
expect(existsSync(result.path)).toBe(true);
}, 90000);
test("should get last assistant text", async () => {
await client.start();
// Initially null
let text = await client.getLastAssistantText();
expect(text).toBeNull();
// Send prompt
await client.promptAndWait("Reply with just: test123");
// Should have text now
text = await client.getLastAssistantText();
expect(text).toContain("test123");
}, 90000);
});