WIP: Refactor agent package - not compiling

- Renamed AppMessage to AgentMessage throughout
- New agent-loop.ts with AgentLoopContext, AgentLoopConfig
- Removed transport abstraction, Agent now takes streamFn directly
- Extracted streamProxy to proxy.ts utility
- Removed agent-loop from pi-ai (now in agent package)
- Updated consumers (coding-agent, mom) for AgentMessage rename
- Tests updated but some consumers still need migration

Known issues:
- AgentTool, AgentToolResult not exported from pi-ai
- Attachment not exported from pi-agent-core
- ProviderTransport removed but still referenced
- messageTransformer -> convertToLlm migration incomplete
- CustomMessages declaration merging not working properly
This commit is contained in:
Mario Zechner 2025-12-28 09:23:38 +01:00
parent f7ef44dc38
commit a055fd4481
32 changed files with 1312 additions and 2009 deletions

View file

@ -0,0 +1,398 @@
/**
* Agent loop that works with AgentMessage throughout.
* Transforms to Message[] only at the LLM call boundary.
*/
import {
type AssistantMessage,
type Context,
EventStream,
streamSimple,
type ToolResultMessage,
validateToolArguments,
} from "@mariozechner/pi-ai";
import type {
AgentContext,
AgentEvent,
AgentLoopConfig,
AgentMessage,
AgentTool,
AgentToolResult,
StreamFn,
} from "./types.js";
/**
* Start an agent loop with a new prompt message.
* The prompt is added to the context and events are emitted for it.
*/
export function agentLoop(
prompt: AgentMessage,
context: AgentContext,
config: AgentLoopConfig,
signal?: AbortSignal,
streamFn?: StreamFn,
): EventStream<AgentEvent, AgentMessage[]> {
const stream = createAgentStream();
(async () => {
const newMessages: AgentMessage[] = [prompt];
const currentContext: AgentContext = {
...context,
messages: [...context.messages, prompt],
};
stream.push({ type: "agent_start" });
stream.push({ type: "turn_start" });
stream.push({ type: "message_start", message: prompt });
stream.push({ type: "message_end", message: prompt });
await runLoop(currentContext, newMessages, config, signal, stream, streamFn);
})();
return stream;
}
/**
* Continue an agent loop from the current context without adding a new message.
* Used for retries - context already has user message or tool results.
*
* **Important:** The last message in context must convert to a `user` or `toolResult` message
* via `convertToLlm`. If it doesn't, the LLM provider will reject the request.
* This cannot be validated here since `convertToLlm` is only called once per turn.
*/
export function agentLoopContinue(
context: AgentContext,
config: AgentLoopConfig,
signal?: AbortSignal,
streamFn?: StreamFn,
): EventStream<AgentEvent, AgentMessage[]> {
if (context.messages.length === 0) {
throw new Error("Cannot continue: no messages in context");
}
if (context.messages[context.messages.length - 1].role === "assistant") {
throw new Error("Cannot continue from message role: assistant");
}
const stream = createAgentStream();
(async () => {
const newMessages: AgentMessage[] = [];
const currentContext: AgentContext = { ...context };
stream.push({ type: "agent_start" });
stream.push({ type: "turn_start" });
await runLoop(currentContext, newMessages, config, signal, stream, streamFn);
})();
return stream;
}
function createAgentStream(): EventStream<AgentEvent, AgentMessage[]> {
return new EventStream<AgentEvent, AgentMessage[]>(
(event: AgentEvent) => event.type === "agent_end",
(event: AgentEvent) => (event.type === "agent_end" ? event.messages : []),
);
}
/**
* Main loop logic shared by agentLoop and agentLoopContinue.
*/
async function runLoop(
currentContext: AgentContext,
newMessages: AgentMessage[],
config: AgentLoopConfig,
signal: AbortSignal | undefined,
stream: EventStream<AgentEvent, AgentMessage[]>,
streamFn?: StreamFn,
): Promise<void> {
let hasMoreToolCalls = true;
let firstTurn = true;
let queuedMessages: AgentMessage[] = (await config.getQueuedMessages?.()) || [];
let queuedAfterTools: AgentMessage[] | null = null;
while (hasMoreToolCalls || queuedMessages.length > 0) {
if (!firstTurn) {
stream.push({ type: "turn_start" });
} else {
firstTurn = false;
}
// Process queued messages (inject before next assistant response)
if (queuedMessages.length > 0) {
for (const message of queuedMessages) {
stream.push({ type: "message_start", message });
stream.push({ type: "message_end", message });
currentContext.messages.push(message);
newMessages.push(message);
}
queuedMessages = [];
}
// Stream assistant response
const message = await streamAssistantResponse(currentContext, config, signal, stream, streamFn);
newMessages.push(message);
if (message.stopReason === "error" || message.stopReason === "aborted") {
stream.push({ type: "turn_end", message, toolResults: [] });
stream.push({ type: "agent_end", messages: newMessages });
stream.end(newMessages);
return;
}
// Check for tool calls
const toolCalls = message.content.filter((c) => c.type === "toolCall");
hasMoreToolCalls = toolCalls.length > 0;
const toolResults: ToolResultMessage[] = [];
if (hasMoreToolCalls) {
const toolExecution = await executeToolCalls(
currentContext.tools,
message,
signal,
stream,
config.getQueuedMessages,
);
toolResults.push(...toolExecution.toolResults);
queuedAfterTools = toolExecution.queuedMessages ?? null;
for (const result of toolResults) {
currentContext.messages.push(result);
newMessages.push(result);
}
}
stream.push({ type: "turn_end", message, toolResults });
// Get queued messages after turn completes
if (queuedAfterTools && queuedAfterTools.length > 0) {
queuedMessages = queuedAfterTools;
queuedAfterTools = null;
} else {
queuedMessages = (await config.getQueuedMessages?.()) || [];
}
}
stream.push({ type: "agent_end", messages: newMessages });
stream.end(newMessages);
}
/**
* Stream an assistant response from the LLM.
* This is where AgentMessage[] gets transformed to Message[] for the LLM.
*/
async function streamAssistantResponse(
context: AgentContext,
config: AgentLoopConfig,
signal: AbortSignal | undefined,
stream: EventStream<AgentEvent, AgentMessage[]>,
streamFn?: StreamFn,
): Promise<AssistantMessage> {
// Apply context transform if configured (AgentMessage[] → AgentMessage[])
let messages = context.messages;
if (config.transformContext) {
messages = await config.transformContext(messages, signal);
}
// Convert to LLM-compatible messages (AgentMessage[] → Message[])
const llmMessages = await config.convertToLlm(messages);
// Build LLM context
const llmContext: Context = {
systemPrompt: context.systemPrompt,
messages: llmMessages,
tools: context.tools,
};
const streamFunction = streamFn || streamSimple;
// Resolve API key (important for expiring tokens)
const resolvedApiKey =
(config.getApiKey ? await config.getApiKey(config.model.provider) : undefined) || config.apiKey;
const response = streamFunction(config.model, llmContext, {
...config,
apiKey: resolvedApiKey,
signal,
});
let partialMessage: AssistantMessage | null = null;
let addedPartial = false;
for await (const event of response) {
switch (event.type) {
case "start":
partialMessage = event.partial;
context.messages.push(partialMessage);
addedPartial = true;
stream.push({ type: "message_start", message: { ...partialMessage } });
break;
case "text_start":
case "text_delta":
case "text_end":
case "thinking_start":
case "thinking_delta":
case "thinking_end":
case "toolcall_start":
case "toolcall_delta":
case "toolcall_end":
if (partialMessage) {
partialMessage = event.partial;
context.messages[context.messages.length - 1] = partialMessage;
stream.push({
type: "message_update",
assistantMessageEvent: event,
message: { ...partialMessage },
});
}
break;
case "done":
case "error": {
const finalMessage = await response.result();
if (addedPartial) {
context.messages[context.messages.length - 1] = finalMessage;
} else {
context.messages.push(finalMessage);
}
if (!addedPartial) {
stream.push({ type: "message_start", message: { ...finalMessage } });
}
stream.push({ type: "message_end", message: finalMessage });
return finalMessage;
}
}
}
return await response.result();
}
/**
* Execute tool calls from an assistant message.
*/
async function executeToolCalls(
tools: AgentTool<any>[] | undefined,
assistantMessage: AssistantMessage,
signal: AbortSignal | undefined,
stream: EventStream<AgentEvent, AgentMessage[]>,
getQueuedMessages?: AgentLoopConfig["getQueuedMessages"],
): Promise<{ toolResults: ToolResultMessage[]; queuedMessages?: AgentMessage[] }> {
const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall");
const results: ToolResultMessage[] = [];
let queuedMessages: AgentMessage[] | undefined;
for (let index = 0; index < toolCalls.length; index++) {
const toolCall = toolCalls[index];
const tool = tools?.find((t) => t.name === toolCall.name);
stream.push({
type: "tool_execution_start",
toolCallId: toolCall.id,
toolName: toolCall.name,
args: toolCall.arguments,
});
let result: AgentToolResult<any>;
let isError = false;
try {
if (!tool) throw new Error(`Tool ${toolCall.name} not found`);
const validatedArgs = validateToolArguments(tool, toolCall);
result = await tool.execute(toolCall.id, validatedArgs, signal, (partialResult) => {
stream.push({
type: "tool_execution_update",
toolCallId: toolCall.id,
toolName: toolCall.name,
args: toolCall.arguments,
partialResult,
});
});
} catch (e) {
result = {
content: [{ type: "text", text: e instanceof Error ? e.message : String(e) }],
details: {},
};
isError = true;
}
stream.push({
type: "tool_execution_end",
toolCallId: toolCall.id,
toolName: toolCall.name,
result,
isError,
});
const toolResultMessage: ToolResultMessage = {
role: "toolResult",
toolCallId: toolCall.id,
toolName: toolCall.name,
content: result.content,
details: result.details,
isError,
timestamp: Date.now(),
};
results.push(toolResultMessage);
stream.push({ type: "message_start", message: toolResultMessage });
stream.push({ type: "message_end", message: toolResultMessage });
// Check for queued messages - skip remaining tools if user interrupted
if (getQueuedMessages) {
const queued = await getQueuedMessages();
if (queued.length > 0) {
queuedMessages = queued;
const remainingCalls = toolCalls.slice(index + 1);
for (const skipped of remainingCalls) {
results.push(skipToolCall(skipped, stream));
}
break;
}
}
}
return { toolResults: results, queuedMessages };
}
function skipToolCall(
toolCall: Extract<AssistantMessage["content"][number], { type: "toolCall" }>,
stream: EventStream<AgentEvent, AgentMessage[]>,
): ToolResultMessage {
const result: AgentToolResult<any> = {
content: [{ type: "text", text: "Skipped due to queued user message." }],
details: {},
};
stream.push({
type: "tool_execution_start",
toolCallId: toolCall.id,
toolName: toolCall.name,
args: toolCall.arguments,
});
stream.push({
type: "tool_execution_end",
toolCallId: toolCall.id,
toolName: toolCall.name,
result,
isError: true,
});
const toolResultMessage: ToolResultMessage = {
role: "toolResult",
toolCallId: toolCall.id,
toolName: toolCall.name,
content: result.content,
details: {},
isError: true,
timestamp: Date.now(),
};
stream.push({ type: "message_start", message: toolResultMessage });
stream.push({ type: "message_end", message: toolResultMessage });
return toolResultMessage;
}

View file

@ -1,64 +1,66 @@
import type { ImageContent, Message, QueuedMessage, ReasoningEffort, TextContent } from "@mariozechner/pi-ai";
import { getModel } from "@mariozechner/pi-ai";
import type { AgentTransport } from "./transports/types.js";
import type { AgentEvent, AgentState, AppMessage, Attachment, ThinkingLevel } from "./types.js";
/**
* Agent class that uses the agent-loop directly.
* No transport abstraction - calls streamSimple via the loop.
*/
import {
getModel,
type ImageContent,
type Message,
type Model,
type ReasoningEffort,
streamSimple,
type TextContent,
} from "@mariozechner/pi-ai";
import { agentLoop, agentLoopContinue } from "./agent-loop.js";
import type {
AgentContext,
AgentEvent,
AgentLoopConfig,
AgentMessage,
AgentState,
AgentTool,
StreamFn,
ThinkingLevel,
} from "./types.js";
/**
* Default message transformer: Keep only LLM-compatible messages, strip app-specific fields.
* Converts attachments to proper content blocks (images ImageContent, documents TextContent).
* Default convertToLlm: Keep only LLM-compatible messages, convert attachments.
*/
function defaultMessageTransformer(messages: AppMessage[]): Message[] {
return messages
.filter((m) => {
// Only keep standard LLM message roles
return m.role === "user" || m.role === "assistant" || m.role === "toolResult";
})
.map((m) => {
if (m.role === "user") {
const { attachments, ...rest } = m as any;
// If no attachments, return as-is
if (!attachments || attachments.length === 0) {
return rest as Message;
}
// Convert attachments to content blocks
const content = Array.isArray(rest.content) ? [...rest.content] : [{ type: "text", text: rest.content }];
for (const attachment of attachments as Attachment[]) {
// Add image blocks for image attachments
if (attachment.type === "image") {
content.push({
type: "image",
data: attachment.content,
mimeType: attachment.mimeType,
} as ImageContent);
}
// Add text blocks for documents with extracted text
else if (attachment.type === "document" && attachment.extractedText) {
content.push({
type: "text",
text: `\n\n[Document: ${attachment.fileName}]\n${attachment.extractedText}`,
isDocument: true,
} as TextContent);
}
}
return { ...rest, content } as Message;
}
return m as Message;
});
function defaultConvertToLlm(messages: AgentMessage[]): Message[] {
return messages.filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult");
}
export interface AgentOptions {
initialState?: Partial<AgentState>;
transport: AgentTransport;
// Transform app messages to LLM-compatible messages before sending to transport
messageTransformer?: (messages: AppMessage[]) => Message[] | Promise<Message[]>;
// Called before each LLM call inside the agent loop - can modify messages (e.g., for pruning)
preprocessor?: (messages: Message[]) => Promise<Message[]>;
// Queue mode: "all" = send all queued messages at once, "one-at-a-time" = send one queued message per turn
/**
* Converts AgentMessage[] to LLM-compatible Message[] before each LLM call.
* Default filters to user/assistant/toolResult and converts attachments.
*/
convertToLlm?: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
/**
* Optional transform applied to context before convertToLlm.
* Use for context pruning, injecting external context, etc.
*/
transformContext?: (messages: AgentMessage[], signal?: AbortSignal) => Promise<AgentMessage[]>;
/**
* Queue mode: "all" = send all queued messages at once, "one-at-a-time" = one per turn
*/
queueMode?: "all" | "one-at-a-time";
/**
* Custom stream function (for proxy backends, etc.). Default uses streamSimple.
*/
streamFn?: StreamFn;
/**
* Resolves an API key dynamically for each LLM call.
* Useful for expiring tokens (e.g., GitHub Copilot OAuth).
*/
getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
}
export class Agent {
@ -73,22 +75,25 @@ export class Agent {
pendingToolCalls: new Set<string>(),
error: undefined,
};
private listeners = new Set<(e: AgentEvent) => void>();
private abortController?: AbortController;
private transport: AgentTransport;
private messageTransformer: (messages: AppMessage[]) => Message[] | Promise<Message[]>;
private preprocessor?: (messages: Message[]) => Promise<Message[]>;
private messageQueue: Array<QueuedMessage<AppMessage>> = [];
private convertToLlm: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
private transformContext?: (messages: AgentMessage[], signal?: AbortSignal) => Promise<AgentMessage[]>;
private messageQueue: AgentMessage[] = [];
private queueMode: "all" | "one-at-a-time";
private streamFn: StreamFn;
private getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
private runningPrompt?: Promise<void>;
private resolveRunningPrompt?: () => void;
constructor(opts: AgentOptions) {
constructor(opts: AgentOptions = {}) {
this._state = { ...this._state, ...opts.initialState };
this.transport = opts.transport;
this.messageTransformer = opts.messageTransformer || defaultMessageTransformer;
this.preprocessor = opts.preprocessor;
this.convertToLlm = opts.convertToLlm || defaultConvertToLlm;
this.transformContext = opts.transformContext;
this.queueMode = opts.queueMode || "one-at-a-time";
this.streamFn = opts.streamFn || streamSimple;
this.getApiKey = opts.getApiKey;
}
get state(): AgentState {
@ -100,12 +105,12 @@ export class Agent {
return () => this.listeners.delete(fn);
}
// State mutators - update internal state without emitting events
// State mutators
setSystemPrompt(v: string) {
this._state.systemPrompt = v;
}
setModel(m: typeof this._state.model) {
setModel(m: Model<any>) {
this._state.model = m;
}
@ -121,25 +126,20 @@ export class Agent {
return this.queueMode;
}
setTools(t: typeof this._state.tools) {
setTools(t: AgentTool<any>[]) {
this._state.tools = t;
}
replaceMessages(ms: AppMessage[]) {
replaceMessages(ms: AgentMessage[]) {
this._state.messages = ms.slice();
}
appendMessage(m: AppMessage) {
appendMessage(m: AgentMessage) {
this._state.messages = [...this._state.messages, m];
}
async queueMessage(m: AppMessage) {
// Transform message and queue it for injection at next turn
const transformed = await this.messageTransformer([m]);
this.messageQueue.push({
original: m,
llm: transformed[0], // undefined if filtered out
});
queueMessage(m: AgentMessage) {
this.messageQueue.push(m);
}
clearMessageQueue() {
@ -154,17 +154,10 @@ export class Agent {
this.abortController?.abort();
}
/**
* Returns a promise that resolves when the current prompt completes.
* Returns immediately resolved promise if no prompt is running.
*/
waitForIdle(): Promise<void> {
return this.runningPrompt ?? Promise.resolve();
}
/**
* Clear all messages and state. Call abort() first if a prompt is in flight.
*/
reset() {
this._state.messages = [];
this._state.isStreaming = false;
@ -174,99 +167,53 @@ export class Agent {
this.messageQueue = [];
}
/** Send a prompt to the agent with an AppMessage. */
async prompt(message: AppMessage): Promise<void>;
/** Send a prompt to the agent with text and optional attachments. */
async prompt(input: string, attachments?: Attachment[]): Promise<void>;
async prompt(input: string | AppMessage, attachments?: Attachment[]) {
/** Send a prompt with an AgentMessage */
async prompt(message: AgentMessage): Promise<void>;
async prompt(input: string, images?: ImageContent[]): Promise<void>;
async prompt(input: string | AgentMessage, images?: ImageContent[]) {
const model = this._state.model;
if (!model) {
throw new Error("No model configured");
}
if (!model) throw new Error("No model configured");
let userMessage: AppMessage;
let userMessage: AgentMessage;
if (typeof input === "string") {
// Build user message from text + attachments
const content: Array<TextContent | ImageContent> = [{ type: "text", text: input }];
if (attachments?.length) {
for (const a of attachments) {
if (a.type === "image") {
content.push({ type: "image", data: a.content, mimeType: a.mimeType });
} else if (a.type === "document" && a.extractedText) {
content.push({
type: "text",
text: `\n\n[Document: ${a.fileName}]\n${a.extractedText}`,
isDocument: true,
} as TextContent);
}
}
if (images && images.length > 0) {
content.push(...images);
}
userMessage = {
role: "user",
content,
attachments: attachments?.length ? attachments : undefined,
timestamp: Date.now(),
};
} else {
// Use provided AppMessage directly
userMessage = input;
}
await this._runAgentLoop(userMessage);
await this._runLoop(userMessage);
}
/**
* Continue from the current context without adding a new user message.
* Used for retry after overflow recovery when context already has user message or tool results.
*/
/** Continue from current context (for retry after overflow) */
async continue() {
const messages = this._state.messages;
if (messages.length === 0) {
throw new Error("No messages to continue from");
}
const lastMessage = messages[messages.length - 1];
if (lastMessage.role !== "user" && lastMessage.role !== "toolResult") {
throw new Error(`Cannot continue from message role: ${lastMessage.role}`);
if (messages[messages.length - 1].role === "assistant") {
throw new Error("Cannot continue from message role: assistant");
}
await this._runAgentLoopContinue();
await this._runLoop(undefined);
}
/**
* Internal: Run the agent loop with a new user message.
* Run the agent loop.
* If userMessage is provided, starts a new conversation turn.
* Otherwise, continues from existing context.
*/
private async _runAgentLoop(userMessage: AppMessage) {
const { llmMessages, cfg } = await this._prepareRun();
// Transform user message (e.g., HookMessage -> user message)
const [transformedUserMessage] = await this.messageTransformer([userMessage]);
const events = this.transport.run(llmMessages, transformedUserMessage, cfg, this.abortController!.signal);
await this._processEvents(events);
}
/**
* Internal: Continue the agent loop from current context.
*/
private async _runAgentLoopContinue() {
const { llmMessages, cfg } = await this._prepareRun();
const events = this.transport.continue(llmMessages, cfg, this.abortController!.signal);
await this._processEvents(events);
}
/**
* Prepare for running the agent loop.
*/
private async _prepareRun() {
private async _runLoop(userMessage?: AgentMessage) {
const model = this._state.model;
if (!model) {
throw new Error("No model configured");
}
if (!model) throw new Error("No model configured");
this.runningPrompt = new Promise<void>((resolve) => {
this.resolveRunningPrompt = resolve;
@ -282,88 +229,89 @@ export class Agent {
? undefined
: this._state.thinkingLevel === "minimal"
? "low"
: this._state.thinkingLevel;
: (this._state.thinkingLevel as ReasoningEffort);
const cfg = {
const context: AgentContext = {
systemPrompt: this._state.systemPrompt,
messages: this._state.messages.slice(),
tools: this._state.tools,
};
const config: AgentLoopConfig = {
model,
reasoning,
preprocessor: this.preprocessor,
getQueuedMessages: async <T>() => {
convertToLlm: this.convertToLlm,
transformContext: this.transformContext,
getApiKey: this.getApiKey,
getQueuedMessages: async () => {
if (this.queueMode === "one-at-a-time") {
if (this.messageQueue.length > 0) {
const first = this.messageQueue[0];
this.messageQueue = this.messageQueue.slice(1);
return [first] as QueuedMessage<T>[];
return [first];
}
return [];
} else {
const queued = this.messageQueue.slice();
this.messageQueue = [];
return queued as QueuedMessage<T>[];
return queued;
}
},
};
const llmMessages = await this.messageTransformer(this._state.messages);
return { llmMessages, cfg, model };
}
/**
* Process events from the transport.
*/
private async _processEvents(events: AsyncIterable<AgentEvent>) {
const model = this._state.model!;
const generatedMessages: AppMessage[] = [];
let partial: AppMessage | null = null;
let partial: AgentMessage | null = null;
try {
for await (const ev of events) {
switch (ev.type) {
case "message_start": {
partial = ev.message as AppMessage;
this._state.streamMessage = ev.message as Message;
const stream = userMessage
? agentLoop(userMessage, context, config, this.abortController.signal, this.streamFn)
: agentLoopContinue(context, config, this.abortController.signal, this.streamFn);
for await (const event of stream) {
// Update internal state based on events
switch (event.type) {
case "message_start":
partial = event.message;
this._state.streamMessage = event.message;
break;
}
case "message_update": {
partial = ev.message;
this._state.streamMessage = ev.message;
case "message_update":
partial = event.message;
this._state.streamMessage = event.message;
break;
}
case "message_end": {
case "message_end":
partial = null;
this._state.streamMessage = null;
this.appendMessage(ev.message);
generatedMessages.push(ev.message);
this.appendMessage(event.message);
break;
}
case "tool_execution_start": {
const s = new Set(this._state.pendingToolCalls);
s.add(ev.toolCallId);
s.add(event.toolCallId);
this._state.pendingToolCalls = s;
break;
}
case "tool_execution_end": {
const s = new Set(this._state.pendingToolCalls);
s.delete(ev.toolCallId);
s.delete(event.toolCallId);
this._state.pendingToolCalls = s;
break;
}
case "turn_end": {
if (ev.message.role === "assistant" && ev.message.errorMessage) {
this._state.error = ev.message.errorMessage;
case "turn_end":
if (event.message.role === "assistant" && (event.message as any).errorMessage) {
this._state.error = (event.message as any).errorMessage;
}
break;
}
case "agent_end": {
case "agent_end":
this._state.streamMessage = null;
break;
}
}
this.emit(ev as AgentEvent);
// Emit to listeners
this.emit(event);
}
// Handle any remaining partial message
@ -375,8 +323,7 @@ export class Agent {
(c.type === "toolCall" && c.name.trim().length > 0),
);
if (!onlyEmpty) {
this.appendMessage(partial as AppMessage);
generatedMessages.push(partial as AppMessage);
this.appendMessage(partial);
} else {
if (this.abortController?.signal.aborted) {
throw new Error("Request was aborted");
@ -384,7 +331,7 @@ export class Agent {
}
}
} catch (err: any) {
const msg: Message = {
const errorMsg: AgentMessage = {
role: "assistant",
content: [{ type: "text", text: "" }],
api: model.api,
@ -401,10 +348,11 @@ export class Agent {
stopReason: this.abortController?.signal.aborted ? "aborted" : "error",
errorMessage: err?.message || String(err),
timestamp: Date.now(),
};
this.appendMessage(msg as AppMessage);
generatedMessages.push(msg as AppMessage);
} as AgentMessage;
this.appendMessage(errorMsg);
this._state.error = err?.message || String(err);
this.emit({ type: "agent_end", messages: [errorMsg] });
} finally {
this._state.isStreaming = false;
this._state.streamMessage = null;

View file

@ -1,22 +1,6 @@
// Core Agent
export { Agent, type AgentOptions } from "./agent.js";
// Transports
export {
type AgentRunConfig,
type AgentTransport,
AppTransport,
type AppTransportOptions,
ProviderTransport,
type ProviderTransportOptions,
type ProxyAssistantMessageEvent,
} from "./transports/index.js";
export * from "./agent.js";
// Loop functions
export * from "./agent-loop.js";
// Types
export type {
AgentEvent,
AgentState,
AppMessage,
Attachment,
CustomMessages,
ThinkingLevel,
UserMessageWithAttachments,
} from "./types.js";
export * from "./types.js";

340
packages/agent/src/proxy.ts Normal file
View file

@ -0,0 +1,340 @@
/**
* Proxy stream function for apps that route LLM calls through a server.
* The server manages auth and proxies requests to LLM providers.
*/
import {
type AssistantMessage,
type AssistantMessageEvent,
type Context,
EventStream,
type Model,
type SimpleStreamOptions,
type StopReason,
type ToolCall,
} from "@mariozechner/pi-ai";
// Internal import for JSON parsing utility
import { parseStreamingJson } from "@mariozechner/pi-ai/dist/utils/json-parse.js";
// Create stream class matching ProxyMessageEventStream
class ProxyMessageEventStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
constructor() {
super(
(event) => event.type === "done" || event.type === "error",
(event) => {
if (event.type === "done") return event.message;
if (event.type === "error") return event.error;
throw new Error("Unexpected event type");
},
);
}
}
/**
* Proxy event types - server sends these with partial field stripped to reduce bandwidth.
*/
export type ProxyAssistantMessageEvent =
| { type: "start" }
| { type: "text_start"; contentIndex: number }
| { type: "text_delta"; contentIndex: number; delta: string }
| { type: "text_end"; contentIndex: number; contentSignature?: string }
| { type: "thinking_start"; contentIndex: number }
| { type: "thinking_delta"; contentIndex: number; delta: string }
| { type: "thinking_end"; contentIndex: number; contentSignature?: string }
| { type: "toolcall_start"; contentIndex: number; id: string; toolName: string }
| { type: "toolcall_delta"; contentIndex: number; delta: string }
| { type: "toolcall_end"; contentIndex: number }
| {
type: "done";
reason: Extract<StopReason, "stop" | "length" | "toolUse">;
usage: AssistantMessage["usage"];
}
| {
type: "error";
reason: Extract<StopReason, "aborted" | "error">;
errorMessage?: string;
usage: AssistantMessage["usage"];
};
export interface ProxyStreamOptions extends SimpleStreamOptions {
/** Auth token for the proxy server */
authToken: string;
/** Proxy server URL (e.g., "https://genai.example.com") */
proxyUrl: string;
}
/**
* Stream function that proxies through a server instead of calling LLM providers directly.
* The server strips the partial field from delta events to reduce bandwidth.
* We reconstruct the partial message client-side.
*
* Use this as the `streamFn` option when creating an Agent that needs to go through a proxy.
*
* @example
* ```typescript
* const agent = new Agent({
* streamFn: (model, context, options) =>
* streamProxy(model, context, {
* ...options,
* authToken: await getAuthToken(),
* proxyUrl: "https://genai.example.com",
* }),
* });
* ```
*/
export function streamProxy(model: Model<any>, context: Context, options: ProxyStreamOptions): ProxyMessageEventStream {
const stream = new ProxyMessageEventStream();
(async () => {
// Initialize the partial message that we'll build up from events
const partial: AssistantMessage = {
role: "assistant",
stopReason: "stop",
content: [],
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
};
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
const abortHandler = () => {
if (reader) {
reader.cancel("Request aborted by user").catch(() => {});
}
};
if (options.signal) {
options.signal.addEventListener("abort", abortHandler);
}
try {
const response = await fetch(`${options.proxyUrl}/api/stream`, {
method: "POST",
headers: {
Authorization: `Bearer ${options.authToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model,
context,
options: {
temperature: options.temperature,
maxTokens: options.maxTokens,
reasoning: options.reasoning,
},
}),
signal: options.signal,
});
if (!response.ok) {
let errorMessage = `Proxy error: ${response.status} ${response.statusText}`;
try {
const errorData = (await response.json()) as { error?: string };
if (errorData.error) {
errorMessage = `Proxy error: ${errorData.error}`;
}
} catch {
// Couldn't parse error response
}
throw new Error(errorMessage);
}
reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (options.signal?.aborted) {
throw new Error("Request aborted by user");
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6).trim();
if (data) {
const proxyEvent = JSON.parse(data) as ProxyAssistantMessageEvent;
const event = processProxyEvent(proxyEvent, partial);
if (event) {
stream.push(event);
}
}
}
}
}
if (options.signal?.aborted) {
throw new Error("Request aborted by user");
}
stream.end();
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
const reason = options.signal?.aborted ? "aborted" : "error";
partial.stopReason = reason;
partial.errorMessage = errorMessage;
stream.push({
type: "error",
reason,
error: partial,
});
stream.end();
} finally {
if (options.signal) {
options.signal.removeEventListener("abort", abortHandler);
}
}
})();
return stream;
}
/**
* Process a proxy event and update the partial message.
*/
function processProxyEvent(
proxyEvent: ProxyAssistantMessageEvent,
partial: AssistantMessage,
): AssistantMessageEvent | undefined {
switch (proxyEvent.type) {
case "start":
return { type: "start", partial };
case "text_start":
partial.content[proxyEvent.contentIndex] = { type: "text", text: "" };
return { type: "text_start", contentIndex: proxyEvent.contentIndex, partial };
case "text_delta": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "text") {
content.text += proxyEvent.delta;
return {
type: "text_delta",
contentIndex: proxyEvent.contentIndex,
delta: proxyEvent.delta,
partial,
};
}
throw new Error("Received text_delta for non-text content");
}
case "text_end": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "text") {
content.textSignature = proxyEvent.contentSignature;
return {
type: "text_end",
contentIndex: proxyEvent.contentIndex,
content: content.text,
partial,
};
}
throw new Error("Received text_end for non-text content");
}
case "thinking_start":
partial.content[proxyEvent.contentIndex] = { type: "thinking", thinking: "" };
return { type: "thinking_start", contentIndex: proxyEvent.contentIndex, partial };
case "thinking_delta": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "thinking") {
content.thinking += proxyEvent.delta;
return {
type: "thinking_delta",
contentIndex: proxyEvent.contentIndex,
delta: proxyEvent.delta,
partial,
};
}
throw new Error("Received thinking_delta for non-thinking content");
}
case "thinking_end": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "thinking") {
content.thinkingSignature = proxyEvent.contentSignature;
return {
type: "thinking_end",
contentIndex: proxyEvent.contentIndex,
content: content.thinking,
partial,
};
}
throw new Error("Received thinking_end for non-thinking content");
}
case "toolcall_start":
partial.content[proxyEvent.contentIndex] = {
type: "toolCall",
id: proxyEvent.id,
name: proxyEvent.toolName,
arguments: {},
partialJson: "",
} satisfies ToolCall & { partialJson: string } as ToolCall;
return { type: "toolcall_start", contentIndex: proxyEvent.contentIndex, partial };
case "toolcall_delta": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "toolCall") {
(content as any).partialJson += proxyEvent.delta;
content.arguments = parseStreamingJson((content as any).partialJson) || {};
partial.content[proxyEvent.contentIndex] = { ...content }; // Trigger reactivity
return {
type: "toolcall_delta",
contentIndex: proxyEvent.contentIndex,
delta: proxyEvent.delta,
partial,
};
}
throw new Error("Received toolcall_delta for non-toolCall content");
}
case "toolcall_end": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "toolCall") {
delete (content as any).partialJson;
return {
type: "toolcall_end",
contentIndex: proxyEvent.contentIndex,
toolCall: content,
partial,
};
}
return undefined;
}
case "done":
partial.stopReason = proxyEvent.reason;
partial.usage = proxyEvent.usage;
return { type: "done", reason: proxyEvent.reason, message: partial };
case "error":
partial.stopReason = proxyEvent.reason;
partial.errorMessage = proxyEvent.errorMessage;
partial.usage = proxyEvent.usage;
return { type: "error", reason: proxyEvent.reason, error: partial };
default: {
const _exhaustiveCheck: never = proxyEvent;
console.warn(`Unhandled proxy event type: ${(proxyEvent as any).type}`);
return undefined;
}
}
}

View file

@ -1,397 +0,0 @@
import type {
AgentContext,
AgentLoopConfig,
Api,
AssistantMessage,
AssistantMessageEvent,
Context,
Message,
Model,
SimpleStreamOptions,
ToolCall,
UserMessage,
} from "@mariozechner/pi-ai";
import { agentLoop, agentLoopContinue } from "@mariozechner/pi-ai";
import { AssistantMessageEventStream } from "@mariozechner/pi-ai/dist/utils/event-stream.js";
import { parseStreamingJson } from "@mariozechner/pi-ai/dist/utils/json-parse.js";
import type { ProxyAssistantMessageEvent } from "./proxy-types.js";
import type { AgentRunConfig, AgentTransport } from "./types.js";
/**
* Stream function that proxies through a server instead of calling providers directly.
* The server strips the partial field from delta events to reduce bandwidth.
* We reconstruct the partial message client-side.
*/
function streamSimpleProxy(
model: Model<any>,
context: Context,
options: SimpleStreamOptions & { authToken: string },
proxyUrl: string,
): AssistantMessageEventStream {
const stream = new AssistantMessageEventStream();
(async () => {
// Initialize the partial message that we'll build up from events
const partial: AssistantMessage = {
role: "assistant",
stopReason: "stop",
content: [],
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
};
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
// Set up abort handler to cancel the reader
const abortHandler = () => {
if (reader) {
reader.cancel("Request aborted by user").catch(() => {});
}
};
if (options.signal) {
options.signal.addEventListener("abort", abortHandler);
}
try {
const response = await fetch(`${proxyUrl}/api/stream`, {
method: "POST",
headers: {
Authorization: `Bearer ${options.authToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model,
context,
options: {
temperature: options.temperature,
maxTokens: options.maxTokens,
reasoning: options.reasoning,
// Don't send apiKey or signal - those are added server-side
},
}),
signal: options.signal,
});
if (!response.ok) {
let errorMessage = `Proxy error: ${response.status} ${response.statusText}`;
try {
const errorData = (await response.json()) as { error?: string };
if (errorData.error) {
errorMessage = `Proxy error: ${errorData.error}`;
}
} catch {
// Couldn't parse error response, use default message
}
throw new Error(errorMessage);
}
// Parse SSE stream
reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Check if aborted after reading
if (options.signal?.aborted) {
throw new Error("Request aborted by user");
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6).trim();
if (data) {
const proxyEvent = JSON.parse(data) as ProxyAssistantMessageEvent;
let event: AssistantMessageEvent | undefined;
// Handle different event types
// Server sends events with partial for non-delta events,
// and without partial for delta events
switch (proxyEvent.type) {
case "start":
event = { type: "start", partial };
break;
case "text_start":
partial.content[proxyEvent.contentIndex] = {
type: "text",
text: "",
};
event = { type: "text_start", contentIndex: proxyEvent.contentIndex, partial };
break;
case "text_delta": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "text") {
content.text += proxyEvent.delta;
event = {
type: "text_delta",
contentIndex: proxyEvent.contentIndex,
delta: proxyEvent.delta,
partial,
};
} else {
throw new Error("Received text_delta for non-text content");
}
break;
}
case "text_end": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "text") {
content.textSignature = proxyEvent.contentSignature;
event = {
type: "text_end",
contentIndex: proxyEvent.contentIndex,
content: content.text,
partial,
};
} else {
throw new Error("Received text_end for non-text content");
}
break;
}
case "thinking_start":
partial.content[proxyEvent.contentIndex] = {
type: "thinking",
thinking: "",
};
event = { type: "thinking_start", contentIndex: proxyEvent.contentIndex, partial };
break;
case "thinking_delta": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "thinking") {
content.thinking += proxyEvent.delta;
event = {
type: "thinking_delta",
contentIndex: proxyEvent.contentIndex,
delta: proxyEvent.delta,
partial,
};
} else {
throw new Error("Received thinking_delta for non-thinking content");
}
break;
}
case "thinking_end": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "thinking") {
content.thinkingSignature = proxyEvent.contentSignature;
event = {
type: "thinking_end",
contentIndex: proxyEvent.contentIndex,
content: content.thinking,
partial,
};
} else {
throw new Error("Received thinking_end for non-thinking content");
}
break;
}
case "toolcall_start":
partial.content[proxyEvent.contentIndex] = {
type: "toolCall",
id: proxyEvent.id,
name: proxyEvent.toolName,
arguments: {},
partialJson: "",
} satisfies ToolCall & { partialJson: string } as ToolCall;
event = { type: "toolcall_start", contentIndex: proxyEvent.contentIndex, partial };
break;
case "toolcall_delta": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "toolCall") {
(content as any).partialJson += proxyEvent.delta;
content.arguments = parseStreamingJson((content as any).partialJson) || {};
event = {
type: "toolcall_delta",
contentIndex: proxyEvent.contentIndex,
delta: proxyEvent.delta,
partial,
};
partial.content[proxyEvent.contentIndex] = { ...content }; // Trigger reactivity
} else {
throw new Error("Received toolcall_delta for non-toolCall content");
}
break;
}
case "toolcall_end": {
const content = partial.content[proxyEvent.contentIndex];
if (content?.type === "toolCall") {
delete (content as any).partialJson;
event = {
type: "toolcall_end",
contentIndex: proxyEvent.contentIndex,
toolCall: content,
partial,
};
}
break;
}
case "done":
partial.stopReason = proxyEvent.reason;
partial.usage = proxyEvent.usage;
event = { type: "done", reason: proxyEvent.reason, message: partial };
break;
case "error":
partial.stopReason = proxyEvent.reason;
partial.errorMessage = proxyEvent.errorMessage;
partial.usage = proxyEvent.usage;
event = { type: "error", reason: proxyEvent.reason, error: partial };
break;
default: {
// Exhaustive check
const _exhaustiveCheck: never = proxyEvent;
console.warn(`Unhandled event type: ${(proxyEvent as any).type}`);
break;
}
}
// Push the event to stream
if (event) {
stream.push(event);
} else {
throw new Error("Failed to create event from proxy event");
}
}
}
}
}
// Check if aborted after reading
if (options.signal?.aborted) {
throw new Error("Request aborted by user");
}
stream.end();
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
partial.stopReason = options.signal?.aborted ? "aborted" : "error";
partial.errorMessage = errorMessage;
stream.push({
type: "error",
reason: partial.stopReason,
error: partial,
} satisfies AssistantMessageEvent);
stream.end();
} finally {
// Clean up abort handler
if (options.signal) {
options.signal.removeEventListener("abort", abortHandler);
}
}
})();
return stream;
}
export interface AppTransportOptions {
/**
* Proxy server URL. The server manages user accounts and proxies requests to LLM providers.
* Example: "https://genai.mariozechner.at"
*/
proxyUrl: string;
/**
* Function to retrieve auth token for the proxy server.
* The token is used for user authentication and authorization.
*/
getAuthToken: () => Promise<string> | string;
}
/**
* Transport that uses an app server with user authentication tokens.
* The server manages user accounts and proxies requests to LLM providers.
*/
export class AppTransport implements AgentTransport {
private options: AppTransportOptions;
constructor(options: AppTransportOptions) {
this.options = options;
}
private async getStreamFn(authToken: string) {
return <TApi extends Api>(model: Model<TApi>, context: Context, options?: SimpleStreamOptions) => {
return streamSimpleProxy(
model,
context,
{
...options,
authToken,
},
this.options.proxyUrl,
);
};
}
private buildContext(messages: Message[], cfg: AgentRunConfig): AgentContext {
return {
systemPrompt: cfg.systemPrompt,
messages,
tools: cfg.tools,
};
}
private buildLoopConfig(cfg: AgentRunConfig): AgentLoopConfig {
return {
model: cfg.model,
reasoning: cfg.reasoning,
getQueuedMessages: cfg.getQueuedMessages,
};
}
async *run(messages: Message[], userMessage: Message, cfg: AgentRunConfig, signal?: AbortSignal) {
const authToken = await this.options.getAuthToken();
if (!authToken) {
throw new Error("Auth token is required for AppTransport");
}
const streamFn = await this.getStreamFn(authToken);
const context = this.buildContext(messages, cfg);
const pc = this.buildLoopConfig(cfg);
for await (const ev of agentLoop(userMessage as unknown as UserMessage, context, pc, signal, streamFn as any)) {
yield ev;
}
}
async *continue(messages: Message[], cfg: AgentRunConfig, signal?: AbortSignal) {
const authToken = await this.options.getAuthToken();
if (!authToken) {
throw new Error("Auth token is required for AppTransport");
}
const streamFn = await this.getStreamFn(authToken);
const context = this.buildContext(messages, cfg);
const pc = this.buildLoopConfig(cfg);
for await (const ev of agentLoopContinue(context, pc, signal, streamFn as any)) {
yield ev;
}
}
}

View file

@ -1,86 +0,0 @@
import {
type AgentContext,
type AgentLoopConfig,
agentLoop,
agentLoopContinue,
type Message,
type UserMessage,
} from "@mariozechner/pi-ai";
import type { AgentRunConfig, AgentTransport } from "./types.js";
export interface ProviderTransportOptions {
/**
* Function to retrieve API key for a given provider.
* If not provided, transport will try to use environment variables.
*/
getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
/**
* Optional CORS proxy URL for browser environments.
* If provided, all requests will be routed through this proxy.
* Format: "https://proxy.example.com"
*/
corsProxyUrl?: string;
}
/**
* Transport that calls LLM providers directly.
* Optionally routes calls through a CORS proxy if configured.
*/
export class ProviderTransport implements AgentTransport {
private options: ProviderTransportOptions;
constructor(options: ProviderTransportOptions = {}) {
this.options = options;
}
private getModel(cfg: AgentRunConfig) {
let model = cfg.model;
if (this.options.corsProxyUrl && cfg.model.baseUrl) {
model = {
...cfg.model,
baseUrl: `${this.options.corsProxyUrl}/?url=${encodeURIComponent(cfg.model.baseUrl)}`,
};
}
return model;
}
private buildContext(messages: Message[], cfg: AgentRunConfig): AgentContext {
return {
systemPrompt: cfg.systemPrompt,
messages,
tools: cfg.tools,
};
}
private buildLoopConfig(model: AgentRunConfig["model"], cfg: AgentRunConfig): AgentLoopConfig {
return {
model,
reasoning: cfg.reasoning,
// Resolve API key per assistant response (important for expiring OAuth tokens)
getApiKey: this.options.getApiKey,
getQueuedMessages: cfg.getQueuedMessages,
preprocessor: cfg.preprocessor,
};
}
async *run(messages: Message[], userMessage: Message, cfg: AgentRunConfig, signal?: AbortSignal) {
const model = this.getModel(cfg);
const context = this.buildContext(messages, cfg);
const pc = this.buildLoopConfig(model, cfg);
for await (const ev of agentLoop(userMessage as unknown as UserMessage, context, pc, signal)) {
yield ev;
}
}
async *continue(messages: Message[], cfg: AgentRunConfig, signal?: AbortSignal) {
const model = this.getModel(cfg);
const context = this.buildContext(messages, cfg);
const pc = this.buildLoopConfig(model, cfg);
for await (const ev of agentLoopContinue(context, pc, signal)) {
yield ev;
}
}
}

View file

@ -1,4 +0,0 @@
export { AppTransport, type AppTransportOptions } from "./AppTransport.js";
export { ProviderTransport, type ProviderTransportOptions } from "./ProviderTransport.js";
export type { ProxyAssistantMessageEvent } from "./proxy-types.js";
export type { AgentRunConfig, AgentTransport } from "./types.js";

View file

@ -1,20 +0,0 @@
import type { StopReason, Usage } from "@mariozechner/pi-ai";
/**
* Event types emitted by the proxy server.
* The server strips the `partial` field from delta events to reduce bandwidth.
* Clients reconstruct the partial message from these events.
*/
export type ProxyAssistantMessageEvent =
| { type: "start" }
| { type: "text_start"; contentIndex: number }
| { type: "text_delta"; contentIndex: number; delta: string }
| { type: "text_end"; contentIndex: number; contentSignature?: string }
| { type: "thinking_start"; contentIndex: number }
| { type: "thinking_delta"; contentIndex: number; delta: string }
| { type: "thinking_end"; contentIndex: number; contentSignature?: string }
| { type: "toolcall_start"; contentIndex: number; id: string; toolName: string }
| { type: "toolcall_delta"; contentIndex: number; delta: string }
| { type: "toolcall_end"; contentIndex: number }
| { type: "done"; reason: Extract<StopReason, "stop" | "length" | "toolUse">; usage: Usage }
| { type: "error"; reason: Extract<StopReason, "aborted" | "error">; errorMessage: string; usage: Usage };

View file

@ -1,34 +0,0 @@
import type { AgentEvent, AgentTool, Message, Model, QueuedMessage, ReasoningEffort } from "@mariozechner/pi-ai";
/**
* The minimal configuration needed to run an agent turn.
*/
export interface AgentRunConfig {
systemPrompt: string;
tools: AgentTool<any>[];
model: Model<any>;
reasoning?: ReasoningEffort;
getQueuedMessages?: <T>() => Promise<QueuedMessage<T>[]>;
/** Called before each LLM call - can modify messages (e.g., for pruning) */
preprocessor?: (messages: Message[]) => Promise<Message[]>;
}
/**
* Transport interface for executing agent turns.
* Transports handle the communication with LLM providers,
* abstracting away the details of API calls, proxies, etc.
*
* Events yielded must match the @mariozechner/pi-ai AgentEvent types.
*/
export interface AgentTransport {
/** Run with a new user message */
run(
messages: Message[],
userMessage: Message,
config: AgentRunConfig,
signal?: AbortSignal,
): AsyncIterable<AgentEvent>;
/** Continue from current context (no new user message) */
continue(messages: Message[], config: AgentRunConfig, signal?: AbortSignal): AsyncIterable<AgentEvent>;
}

View file

@ -1,26 +1,83 @@
import type {
AgentTool,
AssistantMessage,
AssistantMessageEvent,
ImageContent,
Message,
Model,
SimpleStreamOptions,
streamSimple,
TextContent,
Tool,
ToolResultMessage,
UserMessage,
} from "@mariozechner/pi-ai";
import type { Static, TSchema } from "@sinclair/typebox";
export type StreamFn = typeof streamSimple;
/**
* Attachment type definition.
* Processing is done by consumers (e.g., document extraction in web-ui).
* Configuration for the agent loop.
*/
export interface Attachment {
id: string;
type: "image" | "document";
fileName: string;
mimeType: string;
size: number;
content: string; // base64 encoded (without data URL prefix)
extractedText?: string; // For documents
preview?: string; // base64 image preview
export interface AgentLoopConfig extends SimpleStreamOptions {
model: Model<any>;
/**
* Converts AgentMessage[] to LLM-compatible Message[] before each LLM call.
*
* Each AgentMessage must be converted to a UserMessage, AssistantMessage, or ToolResultMessage
* that the LLM can understand. AgentMessages that cannot be converted (e.g., UI-only notifications,
* status messages) should be filtered out.
*
* @example
* ```typescript
* convertToLlm: (messages) => messages.flatMap(m => {
* if (m.role === "hookMessage") {
* // Convert custom message to user message
* return [{ role: "user", content: m.content, timestamp: m.timestamp }];
* }
* if (m.role === "notification") {
* // Filter out UI-only messages
* return [];
* }
* // Pass through standard LLM messages
* return [m];
* })
* ```
*/
convertToLlm: (messages: AgentMessage[]) => Message[] | Promise<Message[]>;
/**
* Optional transform applied to the context before `convertToLlm`.
*
* Use this for operations that work at the AgentMessage level:
* - Context window management (pruning old messages)
* - Injecting context from external sources
*
* @example
* ```typescript
* transformContext: async (messages) => {
* if (estimateTokens(messages) > MAX_TOKENS) {
* return pruneOldMessages(messages);
* }
* return messages;
* }
* ```
*/
transformContext?: (messages: AgentMessage[], signal?: AbortSignal) => Promise<AgentMessage[]>;
/**
* Resolves an API key dynamically for each LLM call.
*
* Useful for short-lived OAuth tokens (e.g., GitHub Copilot) that may expire
* during long-running tool execution phases.
*/
getApiKey?: (provider: string) => Promise<string | undefined> | string | undefined;
/**
* Returns queued messages to inject into the conversation.
*
* Called after each turn to check for user interruptions or injected messages.
* If messages are returned, they're added to the context before the next LLM call.
*/
getQueuedMessages?: () => Promise<AgentMessage[]>;
}
/**
@ -29,11 +86,6 @@ export interface Attachment {
*/
export type ThinkingLevel = "off" | "minimal" | "low" | "medium" | "high" | "xhigh";
/**
* User message with optional attachments.
*/
export type UserMessageWithAttachments = UserMessage & { attachments?: Attachment[] };
/**
* Extensible interface for custom app messages.
* Apps can extend via declaration merging:
@ -41,27 +93,23 @@ export type UserMessageWithAttachments = UserMessage & { attachments?: Attachmen
* @example
* ```typescript
* declare module "@mariozechner/agent" {
* interface CustomMessages {
* interface CustomAgentMessages {
* artifact: ArtifactMessage;
* notification: NotificationMessage;
* }
* }
* ```
*/
export interface CustomMessages {
export interface CustomAgentMessages {
// Empty by default - apps extend via declaration merging
}
/**
* AppMessage: Union of LLM messages + attachments + custom messages.
* AgentMessage: Union of LLM messages + custom messages.
* This abstraction allows apps to add custom message types while maintaining
* type safety and compatibility with the base LLM messages.
*/
export type AppMessage =
| AssistantMessage
| UserMessageWithAttachments
| Message // Includes ToolResultMessage
| CustomMessages[keyof CustomMessages];
export type AgentMessage = Message | CustomAgentMessages[keyof CustomAgentMessages];
/**
* Agent state containing all configuration and conversation data.
@ -71,13 +119,42 @@ export interface AgentState {
model: Model<any>;
thinkingLevel: ThinkingLevel;
tools: AgentTool<any>[];
messages: AppMessage[]; // Can include attachments + custom message types
messages: AgentMessage[]; // Can include attachments + custom message types
isStreaming: boolean;
streamMessage: AppMessage | null;
streamMessage: AgentMessage | null;
pendingToolCalls: Set<string>;
error?: string;
}
export interface AgentToolResult<T> {
// Content blocks supporting text and images
content: (TextContent | ImageContent)[];
// Details to be displayed in a UI or logged
details: T;
}
// Callback for streaming tool execution updates
export type AgentToolUpdateCallback<T = any> = (partialResult: AgentToolResult<T>) => void;
// AgentTool extends Tool but adds the execute function
export interface AgentTool<TParameters extends TSchema = TSchema, TDetails = any> extends Tool<TParameters> {
// A human-readable label for the tool to be displayed in UI
label: string;
execute: (
toolCallId: string,
params: Static<TParameters>,
signal?: AbortSignal,
onUpdate?: AgentToolUpdateCallback<TDetails>,
) => Promise<AgentToolResult<TDetails>>;
}
// AgentContext is like Context but uses AgentTool
export interface AgentContext {
systemPrompt: string;
messages: Message[];
tools?: AgentTool<any>[];
}
/**
* Events emitted by the Agent for UI updates.
* These events provide fine-grained lifecycle information for messages, turns, and tool executions.
@ -85,15 +162,15 @@ export interface AgentState {
export type AgentEvent =
// Agent lifecycle
| { type: "agent_start" }
| { type: "agent_end"; messages: AppMessage[] }
| { type: "agent_end"; messages: AgentMessage[] }
// Turn lifecycle - a turn is one assistant response + any tool calls/results
| { type: "turn_start" }
| { type: "turn_end"; message: AppMessage; toolResults: ToolResultMessage[] }
| { type: "turn_end"; message: AgentMessage; toolResults: ToolResultMessage[] }
// Message lifecycle - emitted for user, assistant, and toolResult messages
| { type: "message_start"; message: AppMessage }
| { type: "message_start"; message: AgentMessage }
// Only emitted for assistant messages during streaming
| { type: "message_update"; message: AppMessage; assistantMessageEvent: AssistantMessageEvent }
| { type: "message_end"; message: AppMessage }
| { type: "message_update"; message: AgentMessage; assistantMessageEvent: AssistantMessageEvent }
| { type: "message_end"; message: AgentMessage }
// Tool execution lifecycle
| { type: "tool_execution_start"; toolCallId: string; toolName: string; args: any }
| { type: "tool_execution_update"; toolCallId: string; toolName: string; args: any; partialResult: any }