import { streamSimple } from "../stream.js"; import type { AssistantMessage, Context, Message, ToolResultMessage, UserMessage } from "../types.js"; import { EventStream } from "../utils/event-stream.js"; import { validateToolArguments } from "../utils/validation.js"; import type { AgentContext, AgentEvent, AgentLoopConfig, AgentTool, AgentToolResult, QueuedMessage } from "./types.js"; // Main prompt function - returns a stream of events export function agentLoop( prompt: UserMessage, context: AgentContext, config: AgentLoopConfig, signal?: AbortSignal, streamFn?: typeof streamSimple, ): EventStream { const stream = new EventStream( (event: AgentEvent) => event.type === "agent_end", (event: AgentEvent) => (event.type === "agent_end" ? event.messages : []), ); // Run the prompt async (async () => { // Track new messages generated during this prompt const newMessages: AgentContext["messages"] = []; // Create user message for the prompt const messages = [...context.messages, prompt]; newMessages.push(prompt); stream.push({ type: "agent_start" }); stream.push({ type: "turn_start" }); stream.push({ type: "message_start", message: prompt }); stream.push({ type: "message_end", message: prompt }); // Update context with new messages const currentContext: AgentContext = { ...context, messages, }; // Keep looping while we have tool calls or queued messages let hasMoreToolCalls = true; let firstTurn = true; let queuedMessages: QueuedMessage[] = (await config.getQueuedMessages?.()) || []; while (hasMoreToolCalls || queuedMessages.length > 0) { if (!firstTurn) { stream.push({ type: "turn_start" }); } else { firstTurn = false; } // Process queued messages first (inject before next assistant response) if (queuedMessages.length > 0) { for (const { original, llm } of queuedMessages) { stream.push({ type: "message_start", message: original }); stream.push({ type: "message_end", message: original }); if (llm) { currentContext.messages.push(llm); newMessages.push(llm); } } queuedMessages = []; } // console.log("agent-loop: ", [...currentContext.messages]); // Stream assistant response const message = await streamAssistantResponse(currentContext, config, signal, stream, streamFn); newMessages.push(message); if (message.stopReason === "error" || message.stopReason === "aborted") { // Stop the loop on error or abort stream.push({ type: "turn_end", message, toolResults: [] }); stream.push({ type: "agent_end", messages: newMessages }); stream.end(newMessages); return; } // Check for tool calls const toolCalls = message.content.filter((c) => c.type === "toolCall"); hasMoreToolCalls = toolCalls.length > 0; const toolResults: ToolResultMessage[] = []; if (hasMoreToolCalls) { // Execute tool calls toolResults.push(...(await executeToolCalls(currentContext.tools, message, signal, stream))); currentContext.messages.push(...toolResults); newMessages.push(...toolResults); } stream.push({ type: "turn_end", message, toolResults: toolResults }); // Get queued messages after turn completes queuedMessages = (await config.getQueuedMessages?.()) || []; } stream.push({ type: "agent_end", messages: newMessages }); stream.end(newMessages); })(); return stream; } // Helper functions async function streamAssistantResponse( context: AgentContext, config: AgentLoopConfig, signal: AbortSignal | undefined, stream: EventStream, streamFn?: typeof streamSimple, ): Promise { // Convert AgentContext to Context for streamSimple // Use a copy of messages to avoid mutating the original context const processedMessages = config.preprocessor ? await config.preprocessor(context.messages, signal) : [...context.messages]; const processedContext: Context = { systemPrompt: context.systemPrompt, messages: [...processedMessages].map((m) => { if (m.role === "toolResult") { const { details, ...rest } = m; return rest; } else { return m; } }), tools: context.tools, // AgentTool extends Tool, so this works }; // Use custom stream function if provided, otherwise use default streamSimple const streamFunction = streamFn || streamSimple; const response = await streamFunction(config.model, processedContext, { ...config, signal }); let partialMessage: AssistantMessage | null = null; let addedPartial = false; for await (const event of response) { switch (event.type) { case "start": partialMessage = event.partial; context.messages.push(partialMessage); addedPartial = true; stream.push({ type: "message_start", message: { ...partialMessage } }); break; case "text_start": case "text_delta": case "text_end": case "thinking_start": case "thinking_delta": case "thinking_end": case "toolcall_start": case "toolcall_delta": case "toolcall_end": if (partialMessage) { partialMessage = event.partial; context.messages[context.messages.length - 1] = partialMessage; stream.push({ type: "message_update", assistantMessageEvent: event, message: { ...partialMessage } }); } break; case "done": case "error": { const finalMessage = await response.result(); if (addedPartial) { context.messages[context.messages.length - 1] = finalMessage; } else { context.messages.push(finalMessage); } stream.push({ type: "message_end", message: finalMessage }); return finalMessage; } } } return await response.result(); } async function executeToolCalls( tools: AgentTool[] | undefined, assistantMessage: AssistantMessage, signal: AbortSignal | undefined, stream: EventStream, ): Promise[]> { const toolCalls = assistantMessage.content.filter((c) => c.type === "toolCall"); const results: ToolResultMessage[] = []; for (const toolCall of toolCalls) { const tool = tools?.find((t) => t.name === toolCall.name); stream.push({ type: "tool_execution_start", toolCallId: toolCall.id, toolName: toolCall.name, args: toolCall.arguments, }); let resultOrError: AgentToolResult | string; let isError = false; try { if (!tool) throw new Error(`Tool ${toolCall.name} not found`); // Validate arguments using shared validation function const validatedArgs = validateToolArguments(tool, toolCall); // Execute with validated, typed arguments resultOrError = await tool.execute(toolCall.id, validatedArgs, signal); } catch (e) { resultOrError = e instanceof Error ? e.message : String(e); isError = true; } stream.push({ type: "tool_execution_end", toolCallId: toolCall.id, toolName: toolCall.name, result: resultOrError, isError, }); // Convert result to content blocks const content: ToolResultMessage["content"] = typeof resultOrError === "string" ? [{ type: "text", text: resultOrError }] : resultOrError.content; const toolResultMessage: ToolResultMessage = { role: "toolResult", toolCallId: toolCall.id, toolName: toolCall.name, content, details: typeof resultOrError === "string" ? ({} as T) : resultOrError.details, isError, timestamp: Date.now(), }; results.push(toolResultMessage); stream.push({ type: "message_start", message: toolResultMessage }); stream.push({ type: "message_end", message: toolResultMessage }); } return results; }