Refactor subagent tool, fix custom tool discovery, fix JSON mode stdout flush

Breaking changes:
- Custom tools now require index.ts entry point in subdirectory
  (e.g., tools/mytool/index.ts instead of tools/mytool.ts)

Subagent tool improvements:
- Refactored to use Message[] from ai package instead of custom types
- Extracted agent discovery to separate agents.ts module
- Added parallel mode streaming (shows progress from all tasks)
- Added turn count to usage stats footer
- Removed redundant Query section from scout output

Fixes:
- JSON mode stdout flush: Fixed race condition where pi --mode json
  could exit before all output was written, causing consumers to
  miss final events

Also:
- Added signal/timeout support to pi.exec() for custom tools and hooks
- Renamed pi-pods bin to avoid conflict with pi
This commit is contained in:
Mario Zechner 2025-12-19 04:10:09 +01:00
parent 1151975afe
commit 4fb3af93fb
15 changed files with 894 additions and 698 deletions

View file

@ -11,7 +11,14 @@ import { fileURLToPath } from "node:url";
import { createJiti } from "jiti";
import { getAgentDir } from "../../config.js";
import type { HookUIContext } from "../hooks/types.js";
import type { CustomToolFactory, CustomToolsLoadResult, ExecResult, LoadedCustomTool, ToolAPI } from "./types.js";
import type {
CustomToolFactory,
CustomToolsLoadResult,
ExecOptions,
ExecResult,
LoadedCustomTool,
ToolAPI,
} from "./types.js";
// Create require function to resolve module paths at runtime
const require = createRequire(import.meta.url);
@ -69,8 +76,9 @@ function resolveToolPath(toolPath: string, cwd: string): string {
/**
* Execute a command and return stdout/stderr/code.
* Supports cancellation via AbortSignal and timeout.
*/
async function execCommand(command: string, args: string[], cwd: string): Promise<ExecResult> {
async function execCommand(command: string, args: string[], cwd: string, options?: ExecOptions): Promise<ExecResult> {
return new Promise((resolve) => {
const proc = spawn(command, args, {
cwd,
@ -80,6 +88,37 @@ async function execCommand(command: string, args: string[], cwd: string): Promis
let stdout = "";
let stderr = "";
let killed = false;
let timeoutId: NodeJS.Timeout | undefined;
const killProcess = () => {
if (!killed) {
killed = true;
proc.kill("SIGTERM");
// Force kill after 5 seconds if SIGTERM doesn't work
setTimeout(() => {
if (!proc.killed) {
proc.kill("SIGKILL");
}
}, 5000);
}
};
// Handle abort signal
if (options?.signal) {
if (options.signal.aborted) {
killProcess();
} else {
options.signal.addEventListener("abort", killProcess, { once: true });
}
}
// Handle timeout
if (options?.timeout && options.timeout > 0) {
timeoutId = setTimeout(() => {
killProcess();
}, options.timeout);
}
proc.stdout.on("data", (data) => {
stdout += data.toString();
@ -90,18 +129,28 @@ async function execCommand(command: string, args: string[], cwd: string): Promis
});
proc.on("close", (code) => {
if (timeoutId) clearTimeout(timeoutId);
if (options?.signal) {
options.signal.removeEventListener("abort", killProcess);
}
resolve({
stdout,
stderr,
code: code ?? 0,
killed,
});
});
proc.on("error", (err) => {
if (timeoutId) clearTimeout(timeoutId);
if (options?.signal) {
options.signal.removeEventListener("abort", killProcess);
}
resolve({
stdout,
stderr: stderr || err.message,
code: 1,
killed,
});
});
});
@ -182,7 +231,7 @@ export async function loadCustomTools(
// Shared API object - all tools get the same instance
const sharedApi: ToolAPI = {
cwd,
exec: (command: string, args: string[]) => execCommand(command, args, cwd),
exec: (command: string, args: string[], options?: ExecOptions) => execCommand(command, args, cwd, options),
ui: createNoOpUIContext(),
hasUI: false,
};
@ -224,21 +273,32 @@ export async function loadCustomTools(
/**
* Discover tool files from a directory.
* Returns all .ts files (and symlinks to .ts files) in the directory (non-recursive).
* Only loads index.ts files from subdirectories (e.g., tools/mytool/index.ts).
*/
function discoverToolsInDir(dir: string): string[] {
if (!fs.existsSync(dir)) {
return [];
}
const tools: string[] = [];
try {
const entries = fs.readdirSync(dir, { withFileTypes: true });
return entries
.filter((e) => (e.isFile() || e.isSymbolicLink()) && e.name.endsWith(".ts"))
.map((e) => path.join(dir, e.name));
for (const entry of entries) {
if (entry.isDirectory() || entry.isSymbolicLink()) {
// Check for index.ts in subdirectory
const indexPath = path.join(dir, entry.name, "index.ts");
if (fs.existsSync(indexPath)) {
tools.push(indexPath);
}
}
}
} catch {
return [];
}
return tools;
}
/**

View file

@ -19,6 +19,15 @@ export interface ExecResult {
stdout: string;
stderr: string;
code: number;
/** True if the process was killed due to signal or timeout */
killed?: boolean;
}
export interface ExecOptions {
/** AbortSignal to cancel the process */
signal?: AbortSignal;
/** Timeout in milliseconds */
timeout?: number;
}
/** API passed to custom tool factory (stable across session changes) */
@ -26,7 +35,7 @@ export interface ToolAPI {
/** Current working directory */
cwd: string;
/** Execute a command */
exec(command: string, args: string[]): Promise<ExecResult>;
exec(command: string, args: string[], options?: ExecOptions): Promise<ExecResult>;
/** UI methods for user interaction (select, confirm, input, notify) */
ui: ToolUIContext;
/** Whether UI is available (false in print/RPC mode) */

View file

@ -6,6 +6,7 @@ import { spawn } from "node:child_process";
import type { LoadedHook, SendHandler } from "./loader.js";
import type {
BranchEventResult,
ExecOptions,
ExecResult,
HookError,
HookEvent,
@ -28,13 +29,45 @@ export type HookErrorListener = (error: HookError) => void;
/**
* Execute a command and return stdout/stderr/code.
* Supports cancellation via AbortSignal and timeout.
*/
async function exec(command: string, args: string[], cwd: string): Promise<ExecResult> {
async function exec(command: string, args: string[], cwd: string, options?: ExecOptions): Promise<ExecResult> {
return new Promise((resolve) => {
const proc = spawn(command, args, { cwd, shell: false });
let stdout = "";
let stderr = "";
let killed = false;
let timeoutId: NodeJS.Timeout | undefined;
const killProcess = () => {
if (!killed) {
killed = true;
proc.kill("SIGTERM");
// Force kill after 5 seconds if SIGTERM doesn't work
setTimeout(() => {
if (!proc.killed) {
proc.kill("SIGKILL");
}
}, 5000);
}
};
// Handle abort signal
if (options?.signal) {
if (options.signal.aborted) {
killProcess();
} else {
options.signal.addEventListener("abort", killProcess, { once: true });
}
}
// Handle timeout
if (options?.timeout && options.timeout > 0) {
timeoutId = setTimeout(() => {
killProcess();
}, options.timeout);
}
proc.stdout?.on("data", (data) => {
stdout += data.toString();
@ -45,11 +78,19 @@ async function exec(command: string, args: string[], cwd: string): Promise<ExecR
});
proc.on("close", (code) => {
resolve({ stdout, stderr, code: code ?? 0 });
if (timeoutId) clearTimeout(timeoutId);
if (options?.signal) {
options.signal.removeEventListener("abort", killProcess);
}
resolve({ stdout, stderr, code: code ?? 0, killed });
});
proc.on("error", (_err) => {
resolve({ stdout, stderr, code: 1 });
if (timeoutId) clearTimeout(timeoutId);
if (options?.signal) {
options.signal.removeEventListener("abort", killProcess);
}
resolve({ stdout, stderr, code: 1, killed });
});
});
}
@ -166,7 +207,7 @@ export class HookRunner {
*/
private createContext(): HookEventContext {
return {
exec: (command: string, args: string[]) => exec(command, args, this.cwd),
exec: (command: string, args: string[], options?: ExecOptions) => exec(command, args, this.cwd, options),
ui: this.uiContext,
hasUI: this.hasUI,
cwd: this.cwd,

View file

@ -27,6 +27,15 @@ export interface ExecResult {
stdout: string;
stderr: string;
code: number;
/** True if the process was killed due to signal or timeout */
killed?: boolean;
}
export interface ExecOptions {
/** AbortSignal to cancel the process */
signal?: AbortSignal;
/** Timeout in milliseconds */
timeout?: number;
}
/**
@ -65,7 +74,7 @@ export interface HookUIContext {
*/
export interface HookEventContext {
/** Execute a command and return stdout/stderr/code */
exec(command: string, args: string[]): Promise<ExecResult>;
exec(command: string, args: string[], options?: ExecOptions): Promise<ExecResult>;
/** UI methods for user interaction */
ui: HookUIContext;
/** Whether UI is available (false in print mode) */

View file

@ -445,6 +445,10 @@ export async function main(args: string[]) {
await runPrintMode(session, mode, parsed.messages, initialMessage, initialAttachments);
// Clean up and exit (file watchers keep process alive)
stopThemeWatcher();
// Wait for stdout to fully flush before exiting
if (process.stdout.writableLength > 0) {
await new Promise<void>((resolve) => process.stdout.once("drain", resolve));
}
process.exit(0);
}
}

View file

@ -109,4 +109,13 @@ export async function runPrintMode(
}
}
}
// Ensure stdout is fully flushed before returning
// This prevents race conditions where the process exits before all output is written
await new Promise<void>((resolve, reject) => {
process.stdout.write("", (err) => {
if (err) reject(err);
else resolve();
});
});
}