mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-15 14:03:49 +00:00
- Update settings-manager with steeringMode/followUpMode (migrates old queueMode) - Update sdk.ts to use new mode options - Update settings-selector UI to show both modes - Add Alt+Enter keybind for follow-up messages - Update RPC API: steer/follow_up commands, set_steering_mode/set_follow_up_mode - Update rpc-client with new methods - Delete dead code: queue-mode-selector.ts - Update tests for new API - Update mom/context.ts stubs - Update web-ui example
495 lines
14 KiB
TypeScript
495 lines
14 KiB
TypeScript
/**
|
|
* RPC Client for programmatic access to the coding agent.
|
|
*
|
|
* Spawns the agent in RPC mode and provides a typed API for all operations.
|
|
*/
|
|
|
|
import { type ChildProcess, spawn } from "node:child_process";
|
|
import * as readline from "node:readline";
|
|
import type { AgentEvent, AgentMessage, ThinkingLevel } from "@mariozechner/pi-agent-core";
|
|
import type { ImageContent } from "@mariozechner/pi-ai";
|
|
import type { SessionStats } from "../../core/agent-session.js";
|
|
import type { BashResult } from "../../core/bash-executor.js";
|
|
import type { CompactionResult } from "../../core/compaction/index.js";
|
|
import type { RpcCommand, RpcResponse, RpcSessionState } from "./rpc-types.js";
|
|
|
|
// ============================================================================
|
|
// Types
|
|
// ============================================================================
|
|
|
|
/** Distributive Omit that works with union types */
|
|
type DistributiveOmit<T, K extends keyof T> = T extends unknown ? Omit<T, K> : never;
|
|
|
|
/** RpcCommand without the id field (for internal send) */
|
|
type RpcCommandBody = DistributiveOmit<RpcCommand, "id">;
|
|
|
|
export interface RpcClientOptions {
|
|
/** Path to the CLI entry point (default: searches for dist/cli.js) */
|
|
cliPath?: string;
|
|
/** Working directory for the agent */
|
|
cwd?: string;
|
|
/** Environment variables */
|
|
env?: Record<string, string>;
|
|
/** Provider to use */
|
|
provider?: string;
|
|
/** Model ID to use */
|
|
model?: string;
|
|
/** Additional CLI arguments */
|
|
args?: string[];
|
|
}
|
|
|
|
export interface ModelInfo {
|
|
provider: string;
|
|
id: string;
|
|
contextWindow: number;
|
|
reasoning: boolean;
|
|
}
|
|
|
|
export type RpcEventListener = (event: AgentEvent) => void;
|
|
|
|
// ============================================================================
|
|
// RPC Client
|
|
// ============================================================================
|
|
|
|
export class RpcClient {
|
|
private process: ChildProcess | null = null;
|
|
private rl: readline.Interface | null = null;
|
|
private eventListeners: RpcEventListener[] = [];
|
|
private pendingRequests: Map<string, { resolve: (response: RpcResponse) => void; reject: (error: Error) => void }> =
|
|
new Map();
|
|
private requestId = 0;
|
|
private stderr = "";
|
|
|
|
constructor(private options: RpcClientOptions = {}) {}
|
|
|
|
/**
|
|
* Start the RPC agent process.
|
|
*/
|
|
async start(): Promise<void> {
|
|
if (this.process) {
|
|
throw new Error("Client already started");
|
|
}
|
|
|
|
const cliPath = this.options.cliPath ?? "dist/cli.js";
|
|
const args = ["--mode", "rpc"];
|
|
|
|
if (this.options.provider) {
|
|
args.push("--provider", this.options.provider);
|
|
}
|
|
if (this.options.model) {
|
|
args.push("--model", this.options.model);
|
|
}
|
|
if (this.options.args) {
|
|
args.push(...this.options.args);
|
|
}
|
|
|
|
this.process = spawn("node", [cliPath, ...args], {
|
|
cwd: this.options.cwd,
|
|
env: { ...process.env, ...this.options.env },
|
|
stdio: ["pipe", "pipe", "pipe"],
|
|
});
|
|
|
|
// Collect stderr for debugging
|
|
this.process.stderr?.on("data", (data) => {
|
|
this.stderr += data.toString();
|
|
});
|
|
|
|
// Set up line reader for stdout
|
|
this.rl = readline.createInterface({
|
|
input: this.process.stdout!,
|
|
terminal: false,
|
|
});
|
|
|
|
this.rl.on("line", (line) => {
|
|
this.handleLine(line);
|
|
});
|
|
|
|
// Wait a moment for process to initialize
|
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
|
|
|
if (this.process.exitCode !== null) {
|
|
throw new Error(`Agent process exited immediately with code ${this.process.exitCode}. Stderr: ${this.stderr}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stop the RPC agent process.
|
|
*/
|
|
async stop(): Promise<void> {
|
|
if (!this.process) return;
|
|
|
|
this.rl?.close();
|
|
this.process.kill("SIGTERM");
|
|
|
|
// Wait for process to exit
|
|
await new Promise<void>((resolve) => {
|
|
const timeout = setTimeout(() => {
|
|
this.process?.kill("SIGKILL");
|
|
resolve();
|
|
}, 1000);
|
|
|
|
this.process?.on("exit", () => {
|
|
clearTimeout(timeout);
|
|
resolve();
|
|
});
|
|
});
|
|
|
|
this.process = null;
|
|
this.rl = null;
|
|
this.pendingRequests.clear();
|
|
}
|
|
|
|
/**
|
|
* Subscribe to agent events.
|
|
*/
|
|
onEvent(listener: RpcEventListener): () => void {
|
|
this.eventListeners.push(listener);
|
|
return () => {
|
|
const index = this.eventListeners.indexOf(listener);
|
|
if (index !== -1) {
|
|
this.eventListeners.splice(index, 1);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Get collected stderr output (useful for debugging).
|
|
*/
|
|
getStderr(): string {
|
|
return this.stderr;
|
|
}
|
|
|
|
// =========================================================================
|
|
// Command Methods
|
|
// =========================================================================
|
|
|
|
/**
|
|
* Send a prompt to the agent.
|
|
* Returns immediately after sending; use onEvent() to receive streaming events.
|
|
* Use waitForIdle() to wait for completion.
|
|
*/
|
|
async prompt(message: string, images?: ImageContent[]): Promise<void> {
|
|
await this.send({ type: "prompt", message, images });
|
|
}
|
|
|
|
/**
|
|
* Queue a steering message to interrupt the agent mid-run.
|
|
*/
|
|
async steer(message: string): Promise<void> {
|
|
await this.send({ type: "steer", message });
|
|
}
|
|
|
|
/**
|
|
* Queue a follow-up message to be processed after the agent finishes.
|
|
*/
|
|
async followUp(message: string): Promise<void> {
|
|
await this.send({ type: "follow_up", message });
|
|
}
|
|
|
|
/**
|
|
* Abort current operation.
|
|
*/
|
|
async abort(): Promise<void> {
|
|
await this.send({ type: "abort" });
|
|
}
|
|
|
|
/**
|
|
* Start a new session, optionally with parent tracking.
|
|
* @param parentSession - Optional parent session path for lineage tracking
|
|
* @returns Object with `cancelled: true` if a hook cancelled the new session
|
|
*/
|
|
async newSession(parentSession?: string): Promise<{ cancelled: boolean }> {
|
|
const response = await this.send({ type: "new_session", parentSession });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Get current session state.
|
|
*/
|
|
async getState(): Promise<RpcSessionState> {
|
|
const response = await this.send({ type: "get_state" });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Set model by provider and ID.
|
|
*/
|
|
async setModel(provider: string, modelId: string): Promise<{ provider: string; id: string }> {
|
|
const response = await this.send({ type: "set_model", provider, modelId });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Cycle to next model.
|
|
*/
|
|
async cycleModel(): Promise<{
|
|
model: { provider: string; id: string };
|
|
thinkingLevel: ThinkingLevel;
|
|
isScoped: boolean;
|
|
} | null> {
|
|
const response = await this.send({ type: "cycle_model" });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Get list of available models.
|
|
*/
|
|
async getAvailableModels(): Promise<ModelInfo[]> {
|
|
const response = await this.send({ type: "get_available_models" });
|
|
return this.getData<{ models: ModelInfo[] }>(response).models;
|
|
}
|
|
|
|
/**
|
|
* Set thinking level.
|
|
*/
|
|
async setThinkingLevel(level: ThinkingLevel): Promise<void> {
|
|
await this.send({ type: "set_thinking_level", level });
|
|
}
|
|
|
|
/**
|
|
* Cycle thinking level.
|
|
*/
|
|
async cycleThinkingLevel(): Promise<{ level: ThinkingLevel } | null> {
|
|
const response = await this.send({ type: "cycle_thinking_level" });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Set steering mode.
|
|
*/
|
|
async setSteeringMode(mode: "all" | "one-at-a-time"): Promise<void> {
|
|
await this.send({ type: "set_steering_mode", mode });
|
|
}
|
|
|
|
/**
|
|
* Set follow-up mode.
|
|
*/
|
|
async setFollowUpMode(mode: "all" | "one-at-a-time"): Promise<void> {
|
|
await this.send({ type: "set_follow_up_mode", mode });
|
|
}
|
|
|
|
/**
|
|
* Compact session context.
|
|
*/
|
|
async compact(customInstructions?: string): Promise<CompactionResult> {
|
|
const response = await this.send({ type: "compact", customInstructions });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Set auto-compaction enabled/disabled.
|
|
*/
|
|
async setAutoCompaction(enabled: boolean): Promise<void> {
|
|
await this.send({ type: "set_auto_compaction", enabled });
|
|
}
|
|
|
|
/**
|
|
* Set auto-retry enabled/disabled.
|
|
*/
|
|
async setAutoRetry(enabled: boolean): Promise<void> {
|
|
await this.send({ type: "set_auto_retry", enabled });
|
|
}
|
|
|
|
/**
|
|
* Abort in-progress retry.
|
|
*/
|
|
async abortRetry(): Promise<void> {
|
|
await this.send({ type: "abort_retry" });
|
|
}
|
|
|
|
/**
|
|
* Execute a bash command.
|
|
*/
|
|
async bash(command: string): Promise<BashResult> {
|
|
const response = await this.send({ type: "bash", command });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Abort running bash command.
|
|
*/
|
|
async abortBash(): Promise<void> {
|
|
await this.send({ type: "abort_bash" });
|
|
}
|
|
|
|
/**
|
|
* Get session statistics.
|
|
*/
|
|
async getSessionStats(): Promise<SessionStats> {
|
|
const response = await this.send({ type: "get_session_stats" });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Export session to HTML.
|
|
*/
|
|
async exportHtml(outputPath?: string): Promise<{ path: string }> {
|
|
const response = await this.send({ type: "export_html", outputPath });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Switch to a different session file.
|
|
* @returns Object with `cancelled: true` if a hook cancelled the switch
|
|
*/
|
|
async switchSession(sessionPath: string): Promise<{ cancelled: boolean }> {
|
|
const response = await this.send({ type: "switch_session", sessionPath });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Branch from a specific message.
|
|
* @returns Object with `text` (the message text) and `cancelled` (if hook cancelled)
|
|
*/
|
|
async branch(entryId: string): Promise<{ text: string; cancelled: boolean }> {
|
|
const response = await this.send({ type: "branch", entryId });
|
|
return this.getData(response);
|
|
}
|
|
|
|
/**
|
|
* Get messages available for branching.
|
|
*/
|
|
async getBranchMessages(): Promise<Array<{ entryId: string; text: string }>> {
|
|
const response = await this.send({ type: "get_branch_messages" });
|
|
return this.getData<{ messages: Array<{ entryId: string; text: string }> }>(response).messages;
|
|
}
|
|
|
|
/**
|
|
* Get text of last assistant message.
|
|
*/
|
|
async getLastAssistantText(): Promise<string | null> {
|
|
const response = await this.send({ type: "get_last_assistant_text" });
|
|
return this.getData<{ text: string | null }>(response).text;
|
|
}
|
|
|
|
/**
|
|
* Get all messages in the session.
|
|
*/
|
|
async getMessages(): Promise<AgentMessage[]> {
|
|
const response = await this.send({ type: "get_messages" });
|
|
return this.getData<{ messages: AgentMessage[] }>(response).messages;
|
|
}
|
|
|
|
// =========================================================================
|
|
// Helpers
|
|
// =========================================================================
|
|
|
|
/**
|
|
* Wait for agent to become idle (no streaming).
|
|
* Resolves when agent_end event is received.
|
|
*/
|
|
waitForIdle(timeout = 60000): Promise<void> {
|
|
return new Promise((resolve, reject) => {
|
|
const timer = setTimeout(() => {
|
|
unsubscribe();
|
|
reject(new Error(`Timeout waiting for agent to become idle. Stderr: ${this.stderr}`));
|
|
}, timeout);
|
|
|
|
const unsubscribe = this.onEvent((event) => {
|
|
if (event.type === "agent_end") {
|
|
clearTimeout(timer);
|
|
unsubscribe();
|
|
resolve();
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Collect events until agent becomes idle.
|
|
*/
|
|
collectEvents(timeout = 60000): Promise<AgentEvent[]> {
|
|
return new Promise((resolve, reject) => {
|
|
const events: AgentEvent[] = [];
|
|
const timer = setTimeout(() => {
|
|
unsubscribe();
|
|
reject(new Error(`Timeout collecting events. Stderr: ${this.stderr}`));
|
|
}, timeout);
|
|
|
|
const unsubscribe = this.onEvent((event) => {
|
|
events.push(event);
|
|
if (event.type === "agent_end") {
|
|
clearTimeout(timer);
|
|
unsubscribe();
|
|
resolve(events);
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Send prompt and wait for completion, returning all events.
|
|
*/
|
|
async promptAndWait(message: string, images?: ImageContent[], timeout = 60000): Promise<AgentEvent[]> {
|
|
const eventsPromise = this.collectEvents(timeout);
|
|
await this.prompt(message, images);
|
|
return eventsPromise;
|
|
}
|
|
|
|
// =========================================================================
|
|
// Internal
|
|
// =========================================================================
|
|
|
|
private handleLine(line: string): void {
|
|
try {
|
|
const data = JSON.parse(line);
|
|
|
|
// Check if it's a response to a pending request
|
|
if (data.type === "response" && data.id && this.pendingRequests.has(data.id)) {
|
|
const pending = this.pendingRequests.get(data.id)!;
|
|
this.pendingRequests.delete(data.id);
|
|
pending.resolve(data as RpcResponse);
|
|
return;
|
|
}
|
|
|
|
// Otherwise it's an event
|
|
for (const listener of this.eventListeners) {
|
|
listener(data as AgentEvent);
|
|
}
|
|
} catch {
|
|
// Ignore non-JSON lines
|
|
}
|
|
}
|
|
|
|
private async send(command: RpcCommandBody): Promise<RpcResponse> {
|
|
if (!this.process?.stdin) {
|
|
throw new Error("Client not started");
|
|
}
|
|
|
|
const id = `req_${++this.requestId}`;
|
|
const fullCommand = { ...command, id } as RpcCommand;
|
|
|
|
return new Promise((resolve, reject) => {
|
|
this.pendingRequests.set(id, { resolve, reject });
|
|
|
|
const timeout = setTimeout(() => {
|
|
this.pendingRequests.delete(id);
|
|
reject(new Error(`Timeout waiting for response to ${command.type}. Stderr: ${this.stderr}`));
|
|
}, 30000);
|
|
|
|
this.pendingRequests.set(id, {
|
|
resolve: (response) => {
|
|
clearTimeout(timeout);
|
|
resolve(response);
|
|
},
|
|
reject: (error) => {
|
|
clearTimeout(timeout);
|
|
reject(error);
|
|
},
|
|
});
|
|
|
|
this.process!.stdin!.write(`${JSON.stringify(fullCommand)}\n`);
|
|
});
|
|
}
|
|
|
|
private getData<T>(response: RpcResponse): T {
|
|
if (!response.success) {
|
|
const errorResponse = response as Extract<RpcResponse, { success: false }>;
|
|
throw new Error(errorResponse.error);
|
|
}
|
|
// Type assertion: we trust response.data matches T based on the command sent.
|
|
// This is safe because each public method specifies the correct T for its command.
|
|
const successResponse = response as Extract<RpcResponse, { success: true; data: unknown }>;
|
|
return successResponse.data as T;
|
|
}
|
|
}
|