import { type AssistantMessage, type AssistantMessageEvent, EventStream, getModel } from "@mariozechner/pi-ai"; import { describe, expect, it } from "vitest"; import { Agent } from "../src/index.js"; // Mock stream that mimics AssistantMessageEventStream class MockAssistantStream extends EventStream { constructor() { super( (event) => event.type === "done" || event.type === "error", (event) => { if (event.type === "done") return event.message; if (event.type === "error") return event.error; throw new Error("Unexpected event type"); }, ); } } function createAssistantMessage(text: string): AssistantMessage { return { role: "assistant", content: [{ type: "text", text }], api: "openai-responses", provider: "openai", model: "mock", usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }, stopReason: "stop", timestamp: Date.now(), }; } describe("Agent", () => { it("should create an agent instance with default state", () => { const agent = new Agent(); expect(agent.state).toBeDefined(); expect(agent.state.systemPrompt).toBe(""); expect(agent.state.model).toBeDefined(); expect(agent.state.thinkingLevel).toBe("off"); expect(agent.state.tools).toEqual([]); expect(agent.state.messages).toEqual([]); expect(agent.state.isStreaming).toBe(false); expect(agent.state.streamMessage).toBe(null); expect(agent.state.pendingToolCalls).toEqual(new Set()); expect(agent.state.error).toBeUndefined(); }); it("should create an agent instance with custom initial state", () => { const customModel = getModel("openai", "gpt-4o-mini"); const agent = new Agent({ initialState: { systemPrompt: "You are a helpful assistant.", model: customModel, thinkingLevel: "low", }, }); expect(agent.state.systemPrompt).toBe("You are a helpful assistant."); expect(agent.state.model).toBe(customModel); expect(agent.state.thinkingLevel).toBe("low"); }); it("should subscribe to events", () => { const agent = new Agent(); let eventCount = 0; const unsubscribe = agent.subscribe((_event) => { eventCount++; }); // No initial event on subscribe expect(eventCount).toBe(0); // State mutators don't emit events agent.setSystemPrompt("Test prompt"); expect(eventCount).toBe(0); expect(agent.state.systemPrompt).toBe("Test prompt"); // Unsubscribe should work unsubscribe(); agent.setSystemPrompt("Another prompt"); expect(eventCount).toBe(0); // Should not increase }); it("should update state with mutators", () => { const agent = new Agent(); // Test setSystemPrompt agent.setSystemPrompt("Custom prompt"); expect(agent.state.systemPrompt).toBe("Custom prompt"); // Test setModel const newModel = getModel("google", "gemini-2.5-flash"); agent.setModel(newModel); expect(agent.state.model).toBe(newModel); // Test setThinkingLevel agent.setThinkingLevel("high"); expect(agent.state.thinkingLevel).toBe("high"); // Test setTools const tools = [{ name: "test", description: "test tool" } as any]; agent.setTools(tools); expect(agent.state.tools).toBe(tools); // Test replaceMessages const messages = [{ role: "user" as const, content: "Hello", timestamp: Date.now() }]; agent.replaceMessages(messages); expect(agent.state.messages).toEqual(messages); expect(agent.state.messages).not.toBe(messages); // Should be a copy // Test appendMessage const newMessage = { role: "assistant" as const, content: [{ type: "text" as const, text: "Hi" }] }; agent.appendMessage(newMessage as any); expect(agent.state.messages).toHaveLength(2); expect(agent.state.messages[1]).toBe(newMessage); // Test clearMessages agent.clearMessages(); expect(agent.state.messages).toEqual([]); }); it("should support steering message queue", async () => { const agent = new Agent(); const message = { role: "user" as const, content: "Steering message", timestamp: Date.now() }; agent.steer(message); // The message is queued but not yet in state.messages expect(agent.state.messages).not.toContainEqual(message); }); it("should support follow-up message queue", async () => { const agent = new Agent(); const message = { role: "user" as const, content: "Follow-up message", timestamp: Date.now() }; agent.followUp(message); // The message is queued but not yet in state.messages expect(agent.state.messages).not.toContainEqual(message); }); it("should handle abort controller", () => { const agent = new Agent(); // Should not throw even if nothing is running expect(() => agent.abort()).not.toThrow(); }); it("should throw when prompt() called while streaming", async () => { let abortSignal: AbortSignal | undefined; const agent = new Agent({ // Use a stream function that responds to abort streamFn: (_model, _context, options) => { abortSignal = options?.signal; const stream = new MockAssistantStream(); queueMicrotask(() => { stream.push({ type: "start", partial: createAssistantMessage("") }); // Check abort signal periodically const checkAbort = () => { if (abortSignal?.aborted) { stream.push({ type: "error", reason: "aborted", error: createAssistantMessage("Aborted") }); } else { setTimeout(checkAbort, 5); } }; checkAbort(); }); return stream; }, }); // Start first prompt (don't await, it will block until abort) const firstPrompt = agent.prompt("First message"); // Wait a tick for isStreaming to be set await new Promise((resolve) => setTimeout(resolve, 10)); expect(agent.state.isStreaming).toBe(true); // Second prompt should reject await expect(agent.prompt("Second message")).rejects.toThrow( "Agent is already processing a prompt. Use steer() or followUp() to queue messages, or wait for completion.", ); // Cleanup - abort to stop the stream agent.abort(); await firstPrompt.catch(() => {}); // Ignore abort error }); it("should throw when continue() called while streaming", async () => { let abortSignal: AbortSignal | undefined; const agent = new Agent({ streamFn: (_model, _context, options) => { abortSignal = options?.signal; const stream = new MockAssistantStream(); queueMicrotask(() => { stream.push({ type: "start", partial: createAssistantMessage("") }); const checkAbort = () => { if (abortSignal?.aborted) { stream.push({ type: "error", reason: "aborted", error: createAssistantMessage("Aborted") }); } else { setTimeout(checkAbort, 5); } }; checkAbort(); }); return stream; }, }); // Start first prompt const firstPrompt = agent.prompt("First message"); await new Promise((resolve) => setTimeout(resolve, 10)); expect(agent.state.isStreaming).toBe(true); // continue() should reject await expect(agent.continue()).rejects.toThrow( "Agent is already processing. Wait for completion before continuing.", ); // Cleanup agent.abort(); await firstPrompt.catch(() => {}); }); });