mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-15 10:05:14 +00:00
352 lines
10 KiB
TypeScript
352 lines
10 KiB
TypeScript
/**
|
|
* Tests for AgentSession concurrent prompt guard.
|
|
*/
|
|
|
|
import { existsSync, mkdirSync, rmSync } from "node:fs";
|
|
import { tmpdir } from "node:os";
|
|
import { join } from "node:path";
|
|
import { Agent } from "@mariozechner/pi-agent-core";
|
|
import { type AssistantMessage, type AssistantMessageEvent, EventStream, getModel } from "@mariozechner/pi-ai";
|
|
import { Type } from "@sinclair/typebox";
|
|
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
|
import { AgentSession } from "../src/core/agent-session.js";
|
|
import { AuthStorage } from "../src/core/auth-storage.js";
|
|
import { ModelRegistry } from "../src/core/model-registry.js";
|
|
import { SessionManager } from "../src/core/session-manager.js";
|
|
import { SettingsManager } from "../src/core/settings-manager.js";
|
|
import { createTestResourceLoader } from "./utilities.js";
|
|
|
|
// Mock stream that mimics AssistantMessageEventStream
|
|
class MockAssistantStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
|
|
constructor() {
|
|
super(
|
|
(event) => event.type === "done" || event.type === "error",
|
|
(event) => {
|
|
if (event.type === "done") return event.message;
|
|
if (event.type === "error") return event.error;
|
|
throw new Error("Unexpected event type");
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
function createAssistantMessage(text: string): AssistantMessage {
|
|
return {
|
|
role: "assistant",
|
|
content: [{ type: "text", text }],
|
|
api: "anthropic-messages",
|
|
provider: "anthropic",
|
|
model: "mock",
|
|
usage: {
|
|
input: 0,
|
|
output: 0,
|
|
cacheRead: 0,
|
|
cacheWrite: 0,
|
|
totalTokens: 0,
|
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
|
},
|
|
stopReason: "stop",
|
|
timestamp: Date.now(),
|
|
};
|
|
}
|
|
|
|
describe("AgentSession concurrent prompt guard", () => {
|
|
let session: AgentSession;
|
|
let tempDir: string;
|
|
|
|
beforeEach(() => {
|
|
tempDir = join(tmpdir(), `pi-concurrent-test-${Date.now()}`);
|
|
mkdirSync(tempDir, { recursive: true });
|
|
});
|
|
|
|
afterEach(async () => {
|
|
if (session) {
|
|
session.dispose();
|
|
}
|
|
if (tempDir && existsSync(tempDir)) {
|
|
rmSync(tempDir, { recursive: true });
|
|
}
|
|
});
|
|
|
|
function createSession() {
|
|
const model = getModel("anthropic", "claude-sonnet-4-5")!;
|
|
let abortSignal: AbortSignal | undefined;
|
|
|
|
// Use a stream function that responds to abort
|
|
const agent = new Agent({
|
|
getApiKey: () => "test-key",
|
|
initialState: {
|
|
model,
|
|
systemPrompt: "Test",
|
|
tools: [],
|
|
},
|
|
streamFn: (_model, _context, options) => {
|
|
abortSignal = options?.signal;
|
|
const stream = new MockAssistantStream();
|
|
queueMicrotask(() => {
|
|
stream.push({ type: "start", partial: createAssistantMessage("") });
|
|
const checkAbort = () => {
|
|
if (abortSignal?.aborted) {
|
|
stream.push({ type: "error", reason: "aborted", error: createAssistantMessage("Aborted") });
|
|
} else {
|
|
setTimeout(checkAbort, 5);
|
|
}
|
|
};
|
|
checkAbort();
|
|
});
|
|
return stream;
|
|
},
|
|
});
|
|
|
|
const sessionManager = SessionManager.inMemory();
|
|
const settingsManager = SettingsManager.create(tempDir, tempDir);
|
|
const authStorage = AuthStorage.create(join(tempDir, "auth.json"));
|
|
const modelRegistry = new ModelRegistry(authStorage, tempDir);
|
|
// Set a runtime API key so validation passes
|
|
authStorage.setRuntimeApiKey("anthropic", "test-key");
|
|
|
|
session = new AgentSession({
|
|
agent,
|
|
sessionManager,
|
|
settingsManager,
|
|
cwd: tempDir,
|
|
modelRegistry,
|
|
resourceLoader: createTestResourceLoader(),
|
|
});
|
|
|
|
return session;
|
|
}
|
|
|
|
it("should throw when prompt() called while streaming", async () => {
|
|
createSession();
|
|
|
|
// Start first prompt (don't await, it will block until abort)
|
|
const firstPrompt = session.prompt("First message");
|
|
|
|
// Wait a tick for isStreaming to be set
|
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
|
|
|
// Verify we're streaming
|
|
expect(session.isStreaming).toBe(true);
|
|
|
|
// Second prompt should reject
|
|
await expect(session.prompt("Second message")).rejects.toThrow(
|
|
"Agent is already processing. Specify streamingBehavior ('steer' or 'followUp') to queue the message.",
|
|
);
|
|
|
|
// Cleanup
|
|
await session.abort();
|
|
await firstPrompt.catch(() => {}); // Ignore abort error
|
|
});
|
|
|
|
it("should allow steer() while streaming", async () => {
|
|
createSession();
|
|
|
|
// Start first prompt
|
|
const firstPrompt = session.prompt("First message");
|
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
|
|
|
// steer should work while streaming
|
|
expect(() => session.steer("Steering message")).not.toThrow();
|
|
expect(session.pendingMessageCount).toBe(1);
|
|
|
|
// Cleanup
|
|
await session.abort();
|
|
await firstPrompt.catch(() => {});
|
|
});
|
|
|
|
it("should allow followUp() while streaming", async () => {
|
|
createSession();
|
|
|
|
// Start first prompt
|
|
const firstPrompt = session.prompt("First message");
|
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
|
|
|
// followUp should work while streaming
|
|
expect(() => session.followUp("Follow-up message")).not.toThrow();
|
|
expect(session.pendingMessageCount).toBe(1);
|
|
|
|
// Cleanup
|
|
await session.abort();
|
|
await firstPrompt.catch(() => {});
|
|
});
|
|
|
|
it("should allow prompt() after previous completes", async () => {
|
|
// Create session with a stream that completes immediately
|
|
const model = getModel("anthropic", "claude-sonnet-4-5")!;
|
|
const agent = new Agent({
|
|
getApiKey: () => "test-key",
|
|
initialState: {
|
|
model,
|
|
systemPrompt: "Test",
|
|
tools: [],
|
|
},
|
|
streamFn: () => {
|
|
const stream = new MockAssistantStream();
|
|
queueMicrotask(() => {
|
|
stream.push({ type: "start", partial: createAssistantMessage("") });
|
|
stream.push({ type: "done", reason: "stop", message: createAssistantMessage("Done") });
|
|
});
|
|
return stream;
|
|
},
|
|
});
|
|
|
|
const sessionManager = SessionManager.inMemory();
|
|
const settingsManager = SettingsManager.create(tempDir, tempDir);
|
|
const authStorage = AuthStorage.create(join(tempDir, "auth.json"));
|
|
const modelRegistry = new ModelRegistry(authStorage, tempDir);
|
|
authStorage.setRuntimeApiKey("anthropic", "test-key");
|
|
|
|
session = new AgentSession({
|
|
agent,
|
|
sessionManager,
|
|
settingsManager,
|
|
cwd: tempDir,
|
|
modelRegistry,
|
|
resourceLoader: createTestResourceLoader(),
|
|
});
|
|
|
|
// First prompt completes
|
|
await session.prompt("First message");
|
|
|
|
// Should not be streaming anymore
|
|
expect(session.isStreaming).toBe(false);
|
|
|
|
// Second prompt should work
|
|
await expect(session.prompt("Second message")).resolves.not.toThrow();
|
|
});
|
|
|
|
it("should persist message_end events in order with slow extension handlers", async () => {
|
|
const model = getModel("anthropic", "claude-sonnet-4-5")!;
|
|
const tool = {
|
|
name: "dummy",
|
|
description: "Dummy tool",
|
|
label: "dummy",
|
|
parameters: Type.Object({ q: Type.String() }),
|
|
execute: async (_toolCallId: string, params: unknown) => {
|
|
const q =
|
|
typeof params === "object" && params !== null && "q" in params
|
|
? String((params as { q: unknown }).q)
|
|
: "";
|
|
return {
|
|
content: [{ type: "text" as const, text: `result:${q}` }],
|
|
details: {},
|
|
};
|
|
},
|
|
};
|
|
|
|
const agent = new Agent({
|
|
getApiKey: () => "test-key",
|
|
initialState: {
|
|
model,
|
|
systemPrompt: "Test",
|
|
tools: [tool],
|
|
},
|
|
streamFn: async (_model, context) => {
|
|
const stream = new MockAssistantStream();
|
|
queueMicrotask(() => {
|
|
const hasToolResult = context.messages.some((message) => message.role === "toolResult");
|
|
|
|
if (hasToolResult) {
|
|
const message: AssistantMessage = {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "done" }],
|
|
api: "anthropic-messages",
|
|
provider: "anthropic",
|
|
model: "mock",
|
|
usage: {
|
|
input: 1,
|
|
output: 1,
|
|
cacheRead: 0,
|
|
cacheWrite: 0,
|
|
totalTokens: 2,
|
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
|
},
|
|
stopReason: "stop",
|
|
timestamp: Date.now(),
|
|
};
|
|
stream.push({ type: "start", partial: { ...message, content: [] } });
|
|
stream.push({ type: "done", reason: "stop", message });
|
|
return;
|
|
}
|
|
|
|
const message: AssistantMessage = {
|
|
role: "assistant",
|
|
content: [
|
|
{ type: "text", text: "calling tool" },
|
|
{ type: "toolCall", id: "toolu_1", name: "dummy", arguments: { q: "x" } },
|
|
],
|
|
api: "anthropic-messages",
|
|
provider: "anthropic",
|
|
model: "mock",
|
|
usage: {
|
|
input: 1,
|
|
output: 1,
|
|
cacheRead: 0,
|
|
cacheWrite: 0,
|
|
totalTokens: 2,
|
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
|
},
|
|
stopReason: "toolUse",
|
|
timestamp: Date.now(),
|
|
};
|
|
|
|
stream.push({ type: "start", partial: { ...message, content: [] } });
|
|
stream.push({ type: "done", reason: "toolUse", message });
|
|
});
|
|
return stream;
|
|
},
|
|
});
|
|
|
|
const sessionManager = SessionManager.inMemory();
|
|
const settingsManager = SettingsManager.create(tempDir, tempDir);
|
|
const authStorage = AuthStorage.create(join(tempDir, "auth.json"));
|
|
const modelRegistry = new ModelRegistry(authStorage, tempDir);
|
|
authStorage.setRuntimeApiKey("anthropic", "test-key");
|
|
|
|
session = new AgentSession({
|
|
agent,
|
|
sessionManager,
|
|
settingsManager,
|
|
cwd: tempDir,
|
|
modelRegistry,
|
|
resourceLoader: createTestResourceLoader(),
|
|
baseToolsOverride: { dummy: tool },
|
|
});
|
|
|
|
const sessionWithRunner = session as unknown as {
|
|
_extensionRunner?: {
|
|
hasHandlers: (eventType: string) => boolean;
|
|
emit: (event: { type: string; message?: { role?: string } }) => Promise<void>;
|
|
emitInput: (
|
|
text: string,
|
|
images: unknown,
|
|
source: "interactive" | "rpc" | "extension",
|
|
) => Promise<{ action: "continue" }>;
|
|
emitBeforeAgentStart: (prompt: string, images: unknown, systemPrompt: string) => Promise<undefined>;
|
|
};
|
|
};
|
|
sessionWithRunner._extensionRunner = {
|
|
hasHandlers: () => false,
|
|
emit: async (event) => {
|
|
if (event.type === "message_end" && event.message?.role === "assistant") {
|
|
await new Promise((resolve) => setTimeout(resolve, 40));
|
|
}
|
|
},
|
|
emitInput: async () => ({ action: "continue" }),
|
|
emitBeforeAgentStart: async () => undefined,
|
|
};
|
|
|
|
await session.prompt("hi");
|
|
await session.agent.waitForIdle();
|
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
|
|
|
const messageEntries = sessionManager.getEntries().filter((entry) => entry.type === "message");
|
|
expect(messageEntries.map((entry) => entry.message.role)).toEqual([
|
|
"user",
|
|
"assistant",
|
|
"toolResult",
|
|
"assistant",
|
|
]);
|
|
});
|
|
});
|