Add tool result streaming

- Add AgentToolUpdateCallback type and optional onUpdate callback to AgentTool.execute()
- Add tool_execution_update event with toolCallId, toolName, args, partialResult
- Normalize tool_execution_end to always use AgentToolResult (no more string fallback)
- Bash tool streams truncated rolling buffer output during execution
- ToolExecutionComponent shows last N lines when collapsed (not first N)
- Interactive mode handles tool_execution_update events
- Update RPC docs and ai/agent READMEs

fixes #44
This commit is contained in:
Mario Zechner 2025-12-16 14:53:17 +01:00
parent 8319628bc3
commit 7ac832586f
12 changed files with 362 additions and 51 deletions

195
packages/agent/README.md Normal file
View file

@ -0,0 +1,195 @@
# @mariozechner/pi-agent-core
Stateful agent abstraction with transport layer for LLM interactions. Provides a reactive `Agent` class that manages conversation state, emits granular events, and supports pluggable transports for different deployment scenarios.
## Installation
```bash
npm install @mariozechner/pi-agent-core
```
## Quick Start
```typescript
import { Agent, ProviderTransport } from '@mariozechner/pi-agent-core';
import { getModel } from '@mariozechner/pi-ai';
// Create agent with direct provider transport
const agent = new Agent({
transport: new ProviderTransport(),
initialState: {
systemPrompt: 'You are a helpful assistant.',
model: getModel('anthropic', 'claude-sonnet-4-20250514'),
thinkingLevel: 'medium',
tools: []
}
});
// Subscribe to events for reactive UI updates
agent.subscribe((event) => {
switch (event.type) {
case 'message_update':
// Stream text to UI
const content = event.message.content;
for (const block of content) {
if (block.type === 'text') console.log(block.text);
}
break;
case 'tool_execution_start':
console.log(`Calling ${event.toolName}...`);
break;
case 'tool_execution_update':
// Stream tool output (e.g., bash stdout)
console.log('Progress:', event.partialResult.content);
break;
case 'tool_execution_end':
console.log(`Result:`, event.result.content);
break;
}
});
// Send a prompt
await agent.prompt('Hello, world!');
// Access conversation state
console.log(agent.state.messages);
```
## Core Concepts
### Agent State
The `Agent` maintains reactive state:
```typescript
interface AgentState {
systemPrompt: string;
model: Model<any>;
thinkingLevel: ThinkingLevel; // 'off' | 'minimal' | 'low' | 'medium' | 'high' | 'xhigh'
tools: AgentTool<any>[];
messages: AppMessage[];
isStreaming: boolean;
streamMessage: Message | null;
pendingToolCalls: Set<string>;
error?: string;
}
```
### Events
Events provide fine-grained lifecycle information:
| Event | Description |
|-------|-------------|
| `agent_start` | Agent begins processing |
| `agent_end` | Agent completes, contains all generated messages |
| `turn_start` | New turn begins (one LLM response + tool executions) |
| `turn_end` | Turn completes with assistant message and tool results |
| `message_start` | Message begins (user, assistant, or toolResult) |
| `message_update` | Assistant message streaming update |
| `message_end` | Message completes |
| `tool_execution_start` | Tool begins execution |
| `tool_execution_update` | Tool streams progress (e.g., bash output) |
| `tool_execution_end` | Tool completes with result |
### Transports
Transports abstract LLM communication:
- **`ProviderTransport`**: Direct API calls using `@mariozechner/pi-ai`
- **`AppTransport`**: Proxy through a backend server (for browser apps)
```typescript
// Direct provider access (Node.js)
const agent = new Agent({
transport: new ProviderTransport({
apiKey: process.env.ANTHROPIC_API_KEY
})
});
// Via proxy (browser)
const agent = new Agent({
transport: new AppTransport({
endpoint: '/api/agent',
headers: { 'Authorization': 'Bearer ...' }
})
});
```
## Message Queue
Queue messages to inject at the next turn:
```typescript
// Queue mode: 'all' or 'one-at-a-time'
agent.setQueueMode('one-at-a-time');
// Queue a message while agent is streaming
await agent.queueMessage({
role: 'user',
content: 'Additional context...',
timestamp: Date.now()
});
```
## Attachments
User messages can include attachments:
```typescript
await agent.prompt('What is in this image?', [{
id: 'img1',
type: 'image',
fileName: 'photo.jpg',
mimeType: 'image/jpeg',
size: 102400,
content: base64ImageData
}]);
```
## Custom Message Types
Extend `AppMessage` for app-specific messages via declaration merging:
```typescript
declare module '@mariozechner/pi-agent-core' {
interface CustomMessages {
artifact: { role: 'artifact'; code: string; language: string };
}
}
// Now AppMessage includes your custom type
const msg: AppMessage = { role: 'artifact', code: '...', language: 'typescript' };
```
## API Reference
### Agent Methods
| Method | Description |
|--------|-------------|
| `prompt(text, attachments?)` | Send a user prompt |
| `continue()` | Continue from current context (for retry after overflow) |
| `abort()` | Abort current operation |
| `waitForIdle()` | Returns promise that resolves when agent is idle |
| `reset()` | Clear all messages and state |
| `subscribe(fn)` | Subscribe to events, returns unsubscribe function |
| `queueMessage(msg)` | Queue message for next turn |
| `clearMessageQueue()` | Clear queued messages |
### State Mutators
| Method | Description |
|--------|-------------|
| `setSystemPrompt(v)` | Update system prompt |
| `setModel(m)` | Switch model |
| `setThinkingLevel(l)` | Set reasoning level |
| `setQueueMode(m)` | Set queue mode ('all' or 'one-at-a-time') |
| `setTools(t)` | Update available tools |
| `replaceMessages(ms)` | Replace all messages |
| `appendMessage(m)` | Append a message |
| `clearMessages()` | Clear all messages |
## License
MIT

View file

@ -95,4 +95,5 @@ export type AgentEvent =
| { type: "message_end"; message: AppMessage }
// Tool execution lifecycle
| { type: "tool_execution_start"; toolCallId: string; toolName: string; args: any }
| { type: "tool_execution_update"; toolCallId: string; toolName: string; args: any; partialResult: any }
| { type: "tool_execution_end"; toolCallId: string; toolName: string; result: any; isError: boolean };

View file

@ -4,8 +4,14 @@
### Added
- **Tool result streaming**: Added `tool_execution_update` event and optional `onUpdate` callback to `AgentTool.execute()` for streaming tool output during execution. Tools can now emit partial results (e.g., bash stdout) that are forwarded to subscribers. ([#44](https://github.com/badlogic/pi-mono/issues/44))
- **X-Initiator header for GitHub Copilot**: Added X-Initiator header handling for GitHub Copilot provider to ensure correct call accounting (agent calls are not deducted from quota). Sets initiator based on last message role. ([#200](https://github.com/badlogic/pi-mono/pull/200) by [@kim0](https://github.com/kim0))
### Changed
- **Normalized tool_execution_end result**: `tool_execution_end` event now always contains `AgentToolResult` (no longer `AgentToolResult | string`). Errors are wrapped in the standard result format.
### Fixed
- **Reasoning disabled by default**: When `reasoning` option is not specified, thinking is now explicitly disabled for all providers. Previously, some providers like Gemini with "dynamic thinking" would use their default (thinking ON), causing unexpected token usage. This was the original intended behavior. ([#180](https://github.com/badlogic/pi-mono/pull/180) by [@markusylisiurunen](https://github.com/markusylisiurunen))

View file

@ -822,10 +822,12 @@ const stream = agentLoop(
// 5. message_start - Assistant message starts
// 6. message_update - Assistant streams response with tool calls
// 7. message_end - Assistant message ends
// 8. tool_execution_start - First calculation (15 * 20)
// 9. tool_execution_end - Result: 300
// 10. tool_execution_start - Second calculation (30 * 40)
// 11. tool_execution_end - Result: 1200
// 8. tool_execution_start - First calculation (15 * 20)
// 9. tool_execution_update - Streaming progress (for long-running tools)
// 10. tool_execution_end - Result: 300
// 11. tool_execution_start - Second calculation (30 * 40)
// 12. tool_execution_update - Streaming progress
// 13. tool_execution_end - Result: 1200
// 12. message_start - Tool result message for first calculation
// 13. message_end - Tool result message ends
// 14. message_start - Tool result message for second calculation
@ -876,11 +878,16 @@ for await (const event of stream) {
console.log(`Calling ${event.toolName} with:`, event.args);
break;
case 'tool_execution_update':
// Streaming progress for long-running tools (e.g., bash output)
console.log(`Progress:`, event.partialResult.content);
break;
case 'tool_execution_end':
if (event.isError) {
console.error(`Tool failed:`, event.result);
} else {
console.log(`Tool result:`, event.result.output);
console.log(`Tool result:`, event.result.content);
}
break;
@ -947,11 +954,13 @@ const weatherTool: AgentTool<typeof weatherSchema, { temp: number }> = {
name: 'get_weather',
description: 'Get current weather for a city',
parameters: weatherSchema,
execute: async (toolCallId, args) => {
execute: async (toolCallId, args, signal, onUpdate) => {
// args is fully typed: { city: string, units: 'celsius' | 'fahrenheit' }
// signal: AbortSignal for cancellation
// onUpdate: Optional callback for streaming progress (emits tool_execution_update events)
const temp = Math.round(Math.random() * 30);
return {
output: `Temperature in ${args.city}: ${temp}°${args.units[0].toUpperCase()}`,
content: [{ type: 'text', text: `Temperature in ${args.city}: ${temp}°${args.units[0].toUpperCase()}` }],
details: { temp }
};
}
@ -973,6 +982,36 @@ const chartTool: AgentTool<typeof Type.Object({ data: Type.Array(Type.Number())
};
}
};
// Tools can stream progress via the onUpdate callback (emits tool_execution_update events)
const bashTool: AgentTool<typeof Type.Object({ command: Type.String() }), { exitCode: number }> = {
label: 'Run Bash',
name: 'bash',
description: 'Execute a bash command',
parameters: Type.Object({ command: Type.String() }),
execute: async (toolCallId, args, signal, onUpdate) => {
let output = '';
const child = spawn('bash', ['-c', args.command]);
child.stdout.on('data', (data) => {
output += data.toString();
// Stream partial output to UI via tool_execution_update events
onUpdate?.({
content: [{ type: 'text', text: output }],
details: { exitCode: -1 } // Not finished yet
});
});
const exitCode = await new Promise<number>((resolve) => {
child.on('close', resolve);
});
return {
content: [{ type: 'text', text: output }],
details: { exitCode }
};
}
};
```
### Validation and Error Handling

View file

@ -243,7 +243,7 @@ async function executeToolCalls<T>(
args: toolCall.arguments,
});
let resultOrError: AgentToolResult<T> | string;
let result: AgentToolResult<T>;
let isError = false;
try {
@ -252,10 +252,21 @@ async function executeToolCalls<T>(
// Validate arguments using shared validation function
const validatedArgs = validateToolArguments(tool, toolCall);
// Execute with validated, typed arguments
resultOrError = await tool.execute(toolCall.id, validatedArgs, signal);
// Execute with validated, typed arguments, passing update callback
result = await tool.execute(toolCall.id, validatedArgs, signal, (partialResult) => {
stream.push({
type: "tool_execution_update",
toolCallId: toolCall.id,
toolName: toolCall.name,
args: toolCall.arguments,
partialResult,
});
});
} catch (e) {
resultOrError = e instanceof Error ? e.message : String(e);
result = {
content: [{ type: "text", text: e instanceof Error ? e.message : String(e) }],
details: {} as T,
};
isError = true;
}
@ -263,20 +274,16 @@ async function executeToolCalls<T>(
type: "tool_execution_end",
toolCallId: toolCall.id,
toolName: toolCall.name,
result: resultOrError,
result,
isError,
});
// Convert result to content blocks
const content: ToolResultMessage<T>["content"] =
typeof resultOrError === "string" ? [{ type: "text", text: resultOrError }] : resultOrError.content;
const toolResultMessage: ToolResultMessage<T> = {
role: "toolResult",
toolCallId: toolCall.id,
toolName: toolCall.name,
content,
details: typeof resultOrError === "string" ? ({} as T) : resultOrError.details,
content: result.content,
details: result.details,
isError,
timestamp: Date.now(),
};

View file

@ -1,3 +1,11 @@
export { agentLoop, agentLoopContinue } from "./agent-loop.js";
export * from "./tools/index.js";
export type { AgentContext, AgentEvent, AgentLoopConfig, AgentTool, AgentToolResult, QueuedMessage } from "./types.js";
export type {
AgentContext,
AgentEvent,
AgentLoopConfig,
AgentTool,
AgentToolResult,
AgentToolUpdateCallback,
QueuedMessage,
} from "./types.js";

View file

@ -18,6 +18,9 @@ export interface AgentToolResult<T> {
details: T;
}
// Callback for streaming tool execution updates
export type AgentToolUpdateCallback<T = any> = (partialResult: AgentToolResult<T>) => void;
// AgentTool extends Tool but adds the execute function
export interface AgentTool<TParameters extends TSchema = TSchema, TDetails = any> extends Tool<TParameters> {
// A human-readable label for the tool to be displayed in UI
@ -26,6 +29,7 @@ export interface AgentTool<TParameters extends TSchema = TSchema, TDetails = any
toolCallId: string,
params: Static<TParameters>,
signal?: AbortSignal,
onUpdate?: AgentToolUpdateCallback<TDetails>,
) => Promise<AgentToolResult<TDetails>>;
}
@ -50,12 +54,20 @@ export type AgentEvent =
| { type: "message_end"; message: Message }
// Emitted when a tool execution starts
| { type: "tool_execution_start"; toolCallId: string; toolName: string; args: any }
// Emitted when a tool execution produces output (streaming)
| {
type: "tool_execution_update";
toolCallId: string;
toolName: string;
args: any;
partialResult: AgentToolResult<any>;
}
// Emitted when a tool execution completes
| {
type: "tool_execution_end";
toolCallId: string;
toolName: string;
result: AgentToolResult<any> | string;
result: AgentToolResult<any>;
isError: boolean;
}
// Emitted when a full turn completes

View file

@ -2,8 +2,14 @@
## [Unreleased]
### Added
- **Streaming bash output**: Bash tool now streams output in real-time during execution. The TUI displays live progress with the last 5 lines visible (expandable with ctrl+o). ([#44](https://github.com/badlogic/pi-mono/issues/44))
### Changed
- **Tool output display**: When collapsed, tool output now shows the last N lines instead of the first N lines, making streaming output more useful.
- Updated `@mariozechner/pi-ai` with X-Initiator header support for GitHub Copilot, ensuring agent calls are not deducted from quota. ([#200](https://github.com/badlogic/pi-mono/pull/200) by [@kim0](https://github.com/kim0))
### Fixed

View file

@ -553,6 +553,7 @@ Events are streamed to stdout as JSON lines during agent operation. Events do NO
| `message_update` | Streaming update (text/thinking/toolcall deltas) |
| `message_end` | Message completes |
| `tool_execution_start` | Tool begins execution |
| `tool_execution_update` | Tool execution progress (streaming output) |
| `tool_execution_end` | Tool completes |
| `auto_compaction_start` | Auto-compaction begins |
| `auto_compaction_end` | Auto-compaction completes |
@ -645,9 +646,9 @@ Example streaming a text response:
{"type":"message_update","message":{...},"assistantMessageEvent":{"type":"text_end","contentIndex":0,"content":"Hello world","partial":{...}}}
```
### tool_execution_start / tool_execution_end
### tool_execution_start / tool_execution_update / tool_execution_end
Emitted when a tool begins and completes execution.
Emitted when a tool begins, streams progress, and completes execution.
```json
{
@ -658,6 +659,23 @@ Emitted when a tool begins and completes execution.
}
```
During execution, `tool_execution_update` events stream partial results (e.g., bash output as it arrives):
```json
{
"type": "tool_execution_update",
"toolCallId": "call_abc123",
"toolName": "bash",
"args": {"command": "ls -la"},
"partialResult": {
"content": [{"type": "text", "text": "partial output so far..."}],
"details": {"truncation": null, "fullOutputPath": null}
}
}
```
When complete:
```json
{
"type": "tool_execution_end",
@ -671,7 +689,7 @@ Emitted when a tool begins and completes execution.
}
```
Use `toolCallId` to correlate `tool_execution_start` with `tool_execution_end`.
Use `toolCallId` to correlate events. The `partialResult` in `tool_execution_update` contains the accumulated output so far (not just the delta), allowing clients to simply replace their display on each update.
### auto_compaction_start / auto_compaction_end

View file

@ -35,6 +35,7 @@ export const bashTool: AgentTool<typeof bashSchema> = {
_toolCallId: string,
{ command, timeout }: { command: string; timeout?: number },
signal?: AbortSignal,
onUpdate?,
) => {
return new Promise((resolve, reject) => {
const { shell, args } = getShellConfig();
@ -92,6 +93,20 @@ export const bashTool: AgentTool<typeof bashSchema> = {
const removed = chunks.shift()!;
chunksBytes -= removed.length;
}
// Stream partial output to callback (truncated rolling buffer)
if (onUpdate) {
const fullBuffer = Buffer.concat(chunks);
const fullText = fullBuffer.toString("utf-8");
const truncation = truncateTail(fullText);
onUpdate({
content: [{ type: "text", text: truncation.content || "" }],
details: {
truncation: truncation.truncated ? truncation : undefined,
fullOutputPath: tempFilePath,
},
});
}
};
// Collect stdout and stderr together

View file

@ -44,6 +44,7 @@ export class ToolExecutionComponent extends Container {
private args: any;
private expanded = false;
private showImages: boolean;
private isPartial = false;
private result?: {
content: Array<{ type: string; text?: string; data?: string; mimeType?: string }>;
isError: boolean;
@ -66,12 +67,16 @@ export class ToolExecutionComponent extends Container {
this.updateDisplay();
}
updateResult(result: {
content: Array<{ type: string; text?: string; data?: string; mimeType?: string }>;
details?: any;
isError: boolean;
}): void {
updateResult(
result: {
content: Array<{ type: string; text?: string; data?: string; mimeType?: string }>;
details?: any;
isError: boolean;
},
isPartial = false,
): void {
this.result = result;
this.isPartial = isPartial;
this.updateDisplay();
}
@ -86,11 +91,11 @@ export class ToolExecutionComponent extends Container {
}
private updateDisplay(): void {
const bgFn = this.result
? this.result.isError
const bgFn = this.isPartial
? (text: string) => theme.bg("toolPendingBg", text)
: this.result?.isError
? (text: string) => theme.bg("toolErrorBg", text)
: (text: string) => theme.bg("toolSuccessBg", text)
: (text: string) => theme.bg("toolPendingBg", text);
: (text: string) => theme.bg("toolSuccessBg", text);
this.contentText.setCustomBgFn(bgFn);
this.contentText.setText(this.formatToolExecution());
@ -164,13 +169,15 @@ export class ToolExecutionComponent extends Container {
if (output) {
const lines = output.split("\n");
const maxLines = this.expanded ? lines.length : 5;
const displayLines = lines.slice(0, maxLines);
const remaining = lines.length - maxLines;
const skipped = Math.max(0, lines.length - maxLines);
const displayLines = lines.slice(-maxLines);
text += "\n\n" + displayLines.map((line: string) => theme.fg("toolOutput", line)).join("\n");
if (remaining > 0) {
text += theme.fg("toolOutput", `\n... (${remaining} more lines)`);
if (skipped > 0) {
text += theme.fg("toolOutput", `\n\n... (${skipped} earlier lines)`);
}
text +=
(skipped > 0 ? "\n" : "\n\n") +
displayLines.map((line: string) => theme.fg("toolOutput", line)).join("\n");
}
// Show truncation warning at the bottom (outside collapsed area)

View file

@ -755,18 +755,19 @@ export class InteractiveMode {
break;
}
case "tool_execution_update": {
const component = this.pendingTools.get(event.toolCallId);
if (component) {
component.updateResult({ ...event.partialResult, isError: false }, true);
this.ui.requestRender();
}
break;
}
case "tool_execution_end": {
const component = this.pendingTools.get(event.toolCallId);
if (component) {
const resultData =
typeof event.result === "string"
? {
content: [{ type: "text" as const, text: event.result }],
details: undefined,
isError: event.isError,
}
: { content: event.result.content, details: event.result.details, isError: event.isError };
component.updateResult(resultData);
component.updateResult({ ...event.result, isError: event.isError });
this.pendingTools.delete(event.toolCallId);
this.ui.requestRender();
}
@ -993,11 +994,7 @@ export class InteractiveMode {
} else if (message.role === "toolResult") {
const component = this.pendingTools.get(message.toolCallId);
if (component) {
component.updateResult({
content: message.content,
details: message.details,
isError: message.isError,
});
component.updateResult(message);
this.pendingTools.delete(message.toolCallId);
}
}