co-mono/packages/coding-agent/test/agent-session-concurrent.test.ts
Mario Zechner 5ef3cc90d1 Add guard against concurrent prompt() calls
Agent.prompt() and Agent.continue() now throw if called while already
streaming, preventing race conditions and corrupted state. Use
queueMessage() to queue messages during streaming, or await the
previous call.

AgentSession.prompt() has the same guard with a message directing
users to queueMessage().

Ref #403
2026-01-02 22:02:24 +01:00

196 lines
5.6 KiB
TypeScript

/**
* Tests for AgentSession concurrent prompt guard.
*/
import { existsSync, mkdirSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { Agent } from "@mariozechner/pi-agent-core";
import { type AssistantMessage, type AssistantMessageEvent, EventStream, getModel } from "@mariozechner/pi-ai";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { AgentSession } from "../src/core/agent-session.js";
import { AuthStorage } from "../src/core/auth-storage.js";
import { ModelRegistry } from "../src/core/model-registry.js";
import { SessionManager } from "../src/core/session-manager.js";
import { SettingsManager } from "../src/core/settings-manager.js";
// Mock stream that mimics AssistantMessageEventStream
class MockAssistantStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
constructor() {
super(
(event) => event.type === "done" || event.type === "error",
(event) => {
if (event.type === "done") return event.message;
if (event.type === "error") return event.error;
throw new Error("Unexpected event type");
},
);
}
}
function createAssistantMessage(text: string): AssistantMessage {
return {
role: "assistant",
content: [{ type: "text", text }],
api: "anthropic-messages",
provider: "anthropic",
model: "mock",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
}
describe("AgentSession concurrent prompt guard", () => {
let session: AgentSession;
let tempDir: string;
beforeEach(() => {
tempDir = join(tmpdir(), `pi-concurrent-test-${Date.now()}`);
mkdirSync(tempDir, { recursive: true });
});
afterEach(async () => {
if (session) {
session.dispose();
}
if (tempDir && existsSync(tempDir)) {
rmSync(tempDir, { recursive: true });
}
});
function createSession() {
const model = getModel("anthropic", "claude-sonnet-4-5")!;
let abortSignal: AbortSignal | undefined;
// Use a stream function that responds to abort
const agent = new Agent({
getApiKey: () => "test-key",
initialState: {
model,
systemPrompt: "Test",
tools: [],
},
streamFn: (_model, _context, options) => {
abortSignal = options?.signal;
const stream = new MockAssistantStream();
queueMicrotask(() => {
stream.push({ type: "start", partial: createAssistantMessage("") });
const checkAbort = () => {
if (abortSignal?.aborted) {
stream.push({ type: "error", reason: "aborted", error: createAssistantMessage("Aborted") });
} else {
setTimeout(checkAbort, 5);
}
};
checkAbort();
});
return stream;
},
});
const sessionManager = SessionManager.inMemory();
const settingsManager = SettingsManager.create(tempDir, tempDir);
const authStorage = new AuthStorage(join(tempDir, "auth.json"));
const modelRegistry = new ModelRegistry(authStorage, tempDir);
// Set a runtime API key so validation passes
authStorage.setRuntimeApiKey("anthropic", "test-key");
session = new AgentSession({
agent,
sessionManager,
settingsManager,
modelRegistry,
});
return session;
}
it("should throw when prompt() called while streaming", async () => {
createSession();
// Start first prompt (don't await, it will block until abort)
const firstPrompt = session.prompt("First message");
// Wait a tick for isStreaming to be set
await new Promise((resolve) => setTimeout(resolve, 10));
// Verify we're streaming
expect(session.isStreaming).toBe(true);
// Second prompt should reject
await expect(session.prompt("Second message")).rejects.toThrow(
"Agent is already processing. Use queueMessage() to queue messages during streaming.",
);
// Cleanup
await session.abort();
await firstPrompt.catch(() => {}); // Ignore abort error
});
it("should allow queueMessage() while streaming", async () => {
createSession();
// Start first prompt
const firstPrompt = session.prompt("First message");
await new Promise((resolve) => setTimeout(resolve, 10));
// queueMessage should work while streaming
expect(() => session.queueMessage("Queued message")).not.toThrow();
expect(session.queuedMessageCount).toBe(1);
// Cleanup
await session.abort();
await firstPrompt.catch(() => {});
});
it("should allow prompt() after previous completes", async () => {
// Create session with a stream that completes immediately
const model = getModel("anthropic", "claude-sonnet-4-5")!;
const agent = new Agent({
getApiKey: () => "test-key",
initialState: {
model,
systemPrompt: "Test",
tools: [],
},
streamFn: () => {
const stream = new MockAssistantStream();
queueMicrotask(() => {
stream.push({ type: "start", partial: createAssistantMessage("") });
stream.push({ type: "done", reason: "stop", message: createAssistantMessage("Done") });
});
return stream;
},
});
const sessionManager = SessionManager.inMemory();
const settingsManager = SettingsManager.create(tempDir, tempDir);
const authStorage = new AuthStorage(join(tempDir, "auth.json"));
const modelRegistry = new ModelRegistry(authStorage, tempDir);
authStorage.setRuntimeApiKey("anthropic", "test-key");
session = new AgentSession({
agent,
sessionManager,
settingsManager,
modelRegistry,
});
// First prompt completes
await session.prompt("First message");
// Should not be streaming anymore
expect(session.isStreaming).toBe(false);
// Second prompt should work
await expect(session.prompt("Second message")).resolves.not.toThrow();
});
});