/** * Tests for AgentSession concurrent prompt guard. */ import { existsSync, mkdirSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { Agent } from "@mariozechner/pi-agent-core"; import { type AssistantMessage, type AssistantMessageEvent, EventStream, getModel } from "@mariozechner/pi-ai"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { AgentSession } from "../src/core/agent-session.js"; import { AuthStorage } from "../src/core/auth-storage.js"; import { ModelRegistry } from "../src/core/model-registry.js"; import { SessionManager } from "../src/core/session-manager.js"; import { SettingsManager } from "../src/core/settings-manager.js"; import { createTestResourceLoader } from "./utilities.js"; // Mock stream that mimics AssistantMessageEventStream class MockAssistantStream extends EventStream { constructor() { super( (event) => event.type === "done" || event.type === "error", (event) => { if (event.type === "done") return event.message; if (event.type === "error") return event.error; throw new Error("Unexpected event type"); }, ); } } function createAssistantMessage(text: string): AssistantMessage { return { role: "assistant", content: [{ type: "text", text }], api: "anthropic-messages", provider: "anthropic", model: "mock", usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0, cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }, stopReason: "stop", timestamp: Date.now(), }; } describe("AgentSession concurrent prompt guard", () => { let session: AgentSession; let tempDir: string; beforeEach(() => { tempDir = join(tmpdir(), `pi-concurrent-test-${Date.now()}`); mkdirSync(tempDir, { recursive: true }); }); afterEach(async () => { if (session) { session.dispose(); } if (tempDir && existsSync(tempDir)) { rmSync(tempDir, { recursive: true }); } }); function createSession() { const model = getModel("anthropic", "claude-sonnet-4-5")!; let abortSignal: AbortSignal | undefined; // Use a stream function that responds to abort const agent = new Agent({ getApiKey: () => "test-key", initialState: { model, systemPrompt: "Test", tools: [], }, streamFn: (_model, _context, options) => { abortSignal = options?.signal; const stream = new MockAssistantStream(); queueMicrotask(() => { stream.push({ type: "start", partial: createAssistantMessage("") }); const checkAbort = () => { if (abortSignal?.aborted) { stream.push({ type: "error", reason: "aborted", error: createAssistantMessage("Aborted") }); } else { setTimeout(checkAbort, 5); } }; checkAbort(); }); return stream; }, }); const sessionManager = SessionManager.inMemory(); const settingsManager = SettingsManager.create(tempDir, tempDir); const authStorage = AuthStorage.create(join(tempDir, "auth.json")); const modelRegistry = new ModelRegistry(authStorage, tempDir); // Set a runtime API key so validation passes authStorage.setRuntimeApiKey("anthropic", "test-key"); session = new AgentSession({ agent, sessionManager, settingsManager, cwd: tempDir, modelRegistry, resourceLoader: createTestResourceLoader(), }); return session; } it("should throw when prompt() called while streaming", async () => { createSession(); // Start first prompt (don't await, it will block until abort) const firstPrompt = session.prompt("First message"); // Wait a tick for isStreaming to be set await new Promise((resolve) => setTimeout(resolve, 10)); // Verify we're streaming expect(session.isStreaming).toBe(true); // Second prompt should reject await expect(session.prompt("Second message")).rejects.toThrow( "Agent is already processing. Specify streamingBehavior ('steer' or 'followUp') to queue the message.", ); // Cleanup await session.abort(); await firstPrompt.catch(() => {}); // Ignore abort error }); it("should allow steer() while streaming", async () => { createSession(); // Start first prompt const firstPrompt = session.prompt("First message"); await new Promise((resolve) => setTimeout(resolve, 10)); // steer should work while streaming expect(() => session.steer("Steering message")).not.toThrow(); expect(session.pendingMessageCount).toBe(1); // Cleanup await session.abort(); await firstPrompt.catch(() => {}); }); it("should allow followUp() while streaming", async () => { createSession(); // Start first prompt const firstPrompt = session.prompt("First message"); await new Promise((resolve) => setTimeout(resolve, 10)); // followUp should work while streaming expect(() => session.followUp("Follow-up message")).not.toThrow(); expect(session.pendingMessageCount).toBe(1); // Cleanup await session.abort(); await firstPrompt.catch(() => {}); }); it("should allow prompt() after previous completes", async () => { // Create session with a stream that completes immediately const model = getModel("anthropic", "claude-sonnet-4-5")!; const agent = new Agent({ getApiKey: () => "test-key", initialState: { model, systemPrompt: "Test", tools: [], }, streamFn: () => { const stream = new MockAssistantStream(); queueMicrotask(() => { stream.push({ type: "start", partial: createAssistantMessage("") }); stream.push({ type: "done", reason: "stop", message: createAssistantMessage("Done") }); }); return stream; }, }); const sessionManager = SessionManager.inMemory(); const settingsManager = SettingsManager.create(tempDir, tempDir); const authStorage = AuthStorage.create(join(tempDir, "auth.json")); const modelRegistry = new ModelRegistry(authStorage, tempDir); authStorage.setRuntimeApiKey("anthropic", "test-key"); session = new AgentSession({ agent, sessionManager, settingsManager, cwd: tempDir, modelRegistry, resourceLoader: createTestResourceLoader(), }); // First prompt completes await session.prompt("First message"); // Should not be streaming anymore expect(session.isStreaming).toBe(false); // Second prompt should work await expect(session.prompt("Second message")).resolves.not.toThrow(); }); });