clanker-agent/packages/coding-agent/test/agent-session-concurrent.test.ts
Harivansh Rathi 67168d8289 chore: rebrand companion-os to clanker-agent
- Rename all package names from companion-* to clanker-*
- Update npm scopes from @mariozechner to @harivansh-afk
- Rename config directories .companion -> .clanker
- Rename environment variables COMPANION_* -> CLANKER_*
- Update all documentation, README files, and install scripts
- Rename package directories (companion-channels, companion-grind, companion-teams)
- Update GitHub URLs to harivansh-afk/clanker-agent
- Preserve full git history from companion-cloud monorepo
2026-03-26 16:22:52 -04:00

402 lines
12 KiB
TypeScript

/**
* Tests for AgentSession concurrent prompt guard.
*/
import { existsSync, mkdirSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { Agent } from "@mariozechner/clanker-agent-core";
import {
type AssistantMessage,
type AssistantMessageEvent,
EventStream,
getModel,
} from "@mariozechner/clanker-ai";
import { Type } from "@sinclair/typebox";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { AgentSession } from "../src/core/agent-session.js";
import { AuthStorage } from "../src/core/auth-storage.js";
import { ModelRegistry } from "../src/core/model-registry.js";
import { SessionManager } from "../src/core/session-manager.js";
import { SettingsManager } from "../src/core/settings-manager.js";
import { createTestResourceLoader } from "./utilities.js";
// Mock stream that mimics AssistantMessageEventStream
class MockAssistantStream extends EventStream<
AssistantMessageEvent,
AssistantMessage
> {
constructor() {
super(
(event) => event.type === "done" || event.type === "error",
(event) => {
if (event.type === "done") return event.message;
if (event.type === "error") return event.error;
throw new Error("Unexpected event type");
},
);
}
}
function createAssistantMessage(text: string): AssistantMessage {
return {
role: "assistant",
content: [{ type: "text", text }],
api: "anthropic-messages",
provider: "anthropic",
model: "mock",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
}
describe("AgentSession concurrent prompt guard", () => {
let session: AgentSession;
let tempDir: string;
beforeEach(() => {
tempDir = join(tmpdir(), `clanker-concurrent-test-${Date.now()}`);
mkdirSync(tempDir, { recursive: true });
});
afterEach(async () => {
if (session) {
session.dispose();
}
if (tempDir && existsSync(tempDir)) {
rmSync(tempDir, { recursive: true });
}
});
function createSession() {
const model = getModel("anthropic", "claude-sonnet-4-5")!;
let abortSignal: AbortSignal | undefined;
// Use a stream function that responds to abort
const agent = new Agent({
getApiKey: () => "test-key",
initialState: {
model,
systemPrompt: "Test",
tools: [],
},
streamFn: (_model, _context, options) => {
abortSignal = options?.signal;
const stream = new MockAssistantStream();
queueMicrotask(() => {
stream.push({ type: "start", partial: createAssistantMessage("") });
const checkAbort = () => {
if (abortSignal?.aborted) {
stream.push({
type: "error",
reason: "aborted",
error: createAssistantMessage("Aborted"),
});
} else {
setTimeout(checkAbort, 5);
}
};
checkAbort();
});
return stream;
},
});
const sessionManager = SessionManager.inMemory();
const settingsManager = SettingsManager.create(tempDir, tempDir);
const authStorage = AuthStorage.create(join(tempDir, "auth.json"));
const modelRegistry = new ModelRegistry(authStorage, tempDir);
// Set a runtime API key so validation passes
authStorage.setRuntimeApiKey("anthropic", "test-key");
session = new AgentSession({
agent,
sessionManager,
settingsManager,
cwd: tempDir,
modelRegistry,
resourceLoader: createTestResourceLoader(),
});
return session;
}
it("should throw when prompt() called while streaming", async () => {
createSession();
// Start first prompt (don't await, it will block until abort)
const firstPrompt = session.prompt("First message");
// Wait a tick for isStreaming to be set
await new Promise((resolve) => setTimeout(resolve, 10));
// Verify we're streaming
expect(session.isStreaming).toBe(true);
// Second prompt should reject
await expect(session.prompt("Second message")).rejects.toThrow(
"Agent is already processing. Specify streamingBehavior ('steer' or 'followUp') to queue the message.",
);
// Cleanup
await session.abort();
await firstPrompt.catch(() => {}); // Ignore abort error
});
it("should allow steer() while streaming", async () => {
createSession();
// Start first prompt
const firstPrompt = session.prompt("First message");
await new Promise((resolve) => setTimeout(resolve, 10));
// steer should work while streaming
expect(() => session.steer("Steering message")).not.toThrow();
expect(session.pendingMessageCount).toBe(1);
// Cleanup
await session.abort();
await firstPrompt.catch(() => {});
});
it("should allow followUp() while streaming", async () => {
createSession();
// Start first prompt
const firstPrompt = session.prompt("First message");
await new Promise((resolve) => setTimeout(resolve, 10));
// followUp should work while streaming
expect(() => session.followUp("Follow-up message")).not.toThrow();
expect(session.pendingMessageCount).toBe(1);
// Cleanup
await session.abort();
await firstPrompt.catch(() => {});
});
it("should allow prompt() after previous completes", async () => {
// Create session with a stream that completes immediately
const model = getModel("anthropic", "claude-sonnet-4-5")!;
const agent = new Agent({
getApiKey: () => "test-key",
initialState: {
model,
systemPrompt: "Test",
tools: [],
},
streamFn: () => {
const stream = new MockAssistantStream();
queueMicrotask(() => {
stream.push({ type: "start", partial: createAssistantMessage("") });
stream.push({
type: "done",
reason: "stop",
message: createAssistantMessage("Done"),
});
});
return stream;
},
});
const sessionManager = SessionManager.inMemory();
const settingsManager = SettingsManager.create(tempDir, tempDir);
const authStorage = AuthStorage.create(join(tempDir, "auth.json"));
const modelRegistry = new ModelRegistry(authStorage, tempDir);
authStorage.setRuntimeApiKey("anthropic", "test-key");
session = new AgentSession({
agent,
sessionManager,
settingsManager,
cwd: tempDir,
modelRegistry,
resourceLoader: createTestResourceLoader(),
});
// First prompt completes
await session.prompt("First message");
// Should not be streaming anymore
expect(session.isStreaming).toBe(false);
// Second prompt should work
await expect(session.prompt("Second message")).resolves.not.toThrow();
});
it("should persist message_end events in order with slow extension handlers", async () => {
const model = getModel("anthropic", "claude-sonnet-4-5")!;
const tool = {
name: "dummy",
description: "Dummy tool",
label: "dummy",
parameters: Type.Object({ q: Type.String() }),
execute: async (_toolCallId: string, params: unknown) => {
const q =
typeof params === "object" && params !== null && "q" in params
? String((params as { q: unknown }).q)
: "";
return {
content: [{ type: "text" as const, text: `result:${q}` }],
details: {},
};
},
};
const agent = new Agent({
getApiKey: () => "test-key",
initialState: {
model,
systemPrompt: "Test",
tools: [tool],
},
streamFn: async (_model, context) => {
const stream = new MockAssistantStream();
queueMicrotask(() => {
const hasToolResult = context.messages.some(
(message) => message.role === "toolResult",
);
if (hasToolResult) {
const message: AssistantMessage = {
role: "assistant",
content: [{ type: "text", text: "done" }],
api: "anthropic-messages",
provider: "anthropic",
model: "mock",
usage: {
input: 1,
output: 1,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 2,
cost: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
},
},
stopReason: "stop",
timestamp: Date.now(),
};
stream.push({
type: "start",
partial: { ...message, content: [] },
});
stream.push({ type: "done", reason: "stop", message });
return;
}
const message: AssistantMessage = {
role: "assistant",
content: [
{ type: "text", text: "calling tool" },
{
type: "toolCall",
id: "toolu_1",
name: "dummy",
arguments: { q: "x" },
},
],
api: "anthropic-messages",
provider: "anthropic",
model: "mock",
usage: {
input: 1,
output: 1,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 2,
cost: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
},
},
stopReason: "toolUse",
timestamp: Date.now(),
};
stream.push({ type: "start", partial: { ...message, content: [] } });
stream.push({ type: "done", reason: "toolUse", message });
});
return stream;
},
});
const sessionManager = SessionManager.inMemory();
const settingsManager = SettingsManager.create(tempDir, tempDir);
const authStorage = AuthStorage.create(join(tempDir, "auth.json"));
const modelRegistry = new ModelRegistry(authStorage, tempDir);
authStorage.setRuntimeApiKey("anthropic", "test-key");
session = new AgentSession({
agent,
sessionManager,
settingsManager,
cwd: tempDir,
modelRegistry,
resourceLoader: createTestResourceLoader(),
baseToolsOverride: { dummy: tool },
});
const sessionWithRunner = session as unknown as {
_extensionRunner?: {
hasHandlers: (eventType: string) => boolean;
emit: (event: {
type: string;
message?: { role?: string };
}) => Promise<void>;
emitInput: (
text: string,
images: unknown,
source: "interactive" | "rpc" | "extension",
) => Promise<{ action: "continue" }>;
emitBeforeAgentStart: (
prompt: string,
images: unknown,
systemPrompt: string,
) => Promise<undefined>;
};
};
sessionWithRunner._extensionRunner = {
hasHandlers: () => false,
emit: async (event) => {
if (
event.type === "message_end" &&
event.message?.role === "assistant"
) {
await new Promise((resolve) => setTimeout(resolve, 40));
}
},
emitInput: async () => ({ action: "continue" }),
emitBeforeAgentStart: async () => undefined,
};
await session.prompt("hi");
await session.agent.waitForIdle();
await new Promise((resolve) => setTimeout(resolve, 100));
const messageEntries = sessionManager
.getEntries()
.filter((entry) => entry.type === "message");
expect(messageEntries.map((entry) => entry.message.role)).toEqual([
"user",
"assistant",
"toolResult",
"assistant",
]);
});
});