co-mono/packages/coding-agent/test/agent-session-concurrent.test.ts
Mario Zechner b846a4bfcf feat(coding-agent): ResourceLoader, package management, and /reload command (#645)
- Add ResourceLoader interface and DefaultResourceLoader implementation
- Add PackageManager for npm/git extension sources with install/remove/update
- Add session.reload() and session.bindExtensions() APIs
- Add /reload command in interactive mode
- Add CLI flags: --skill, --theme, --prompt-template, --no-themes, --no-prompt-templates
- Add pi install/remove/update commands for extension management
- Refactor settings.json to use arrays for skills, prompts, themes
- Remove legacy SkillsSettings source flags and filters
- Update SDK examples and documentation for ResourceLoader pattern
- Add theme registration and loadThemeFromPath for dynamic themes
- Add getShellEnv to include bin dir in PATH for bash commands
2026-01-22 13:49:38 +01:00

217 lines
6.3 KiB
TypeScript

/**
* Tests for AgentSession concurrent prompt guard.
*/
import { existsSync, mkdirSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { Agent } from "@mariozechner/pi-agent-core";
import { type AssistantMessage, type AssistantMessageEvent, EventStream, getModel } from "@mariozechner/pi-ai";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { AgentSession } from "../src/core/agent-session.js";
import { AuthStorage } from "../src/core/auth-storage.js";
import { ModelRegistry } from "../src/core/model-registry.js";
import { SessionManager } from "../src/core/session-manager.js";
import { SettingsManager } from "../src/core/settings-manager.js";
import { createTestResourceLoader } from "./utilities.js";
// Mock stream that mimics AssistantMessageEventStream
class MockAssistantStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
constructor() {
super(
(event) => event.type === "done" || event.type === "error",
(event) => {
if (event.type === "done") return event.message;
if (event.type === "error") return event.error;
throw new Error("Unexpected event type");
},
);
}
}
function createAssistantMessage(text: string): AssistantMessage {
return {
role: "assistant",
content: [{ type: "text", text }],
api: "anthropic-messages",
provider: "anthropic",
model: "mock",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
};
}
describe("AgentSession concurrent prompt guard", () => {
let session: AgentSession;
let tempDir: string;
beforeEach(() => {
tempDir = join(tmpdir(), `pi-concurrent-test-${Date.now()}`);
mkdirSync(tempDir, { recursive: true });
});
afterEach(async () => {
if (session) {
session.dispose();
}
if (tempDir && existsSync(tempDir)) {
rmSync(tempDir, { recursive: true });
}
});
function createSession() {
const model = getModel("anthropic", "claude-sonnet-4-5")!;
let abortSignal: AbortSignal | undefined;
// Use a stream function that responds to abort
const agent = new Agent({
getApiKey: () => "test-key",
initialState: {
model,
systemPrompt: "Test",
tools: [],
},
streamFn: (_model, _context, options) => {
abortSignal = options?.signal;
const stream = new MockAssistantStream();
queueMicrotask(() => {
stream.push({ type: "start", partial: createAssistantMessage("") });
const checkAbort = () => {
if (abortSignal?.aborted) {
stream.push({ type: "error", reason: "aborted", error: createAssistantMessage("Aborted") });
} else {
setTimeout(checkAbort, 5);
}
};
checkAbort();
});
return stream;
},
});
const sessionManager = SessionManager.inMemory();
const settingsManager = SettingsManager.create(tempDir, tempDir);
const authStorage = new AuthStorage(join(tempDir, "auth.json"));
const modelRegistry = new ModelRegistry(authStorage, tempDir);
// Set a runtime API key so validation passes
authStorage.setRuntimeApiKey("anthropic", "test-key");
session = new AgentSession({
agent,
sessionManager,
settingsManager,
cwd: tempDir,
modelRegistry,
resourceLoader: createTestResourceLoader(),
});
return session;
}
it("should throw when prompt() called while streaming", async () => {
createSession();
// Start first prompt (don't await, it will block until abort)
const firstPrompt = session.prompt("First message");
// Wait a tick for isStreaming to be set
await new Promise((resolve) => setTimeout(resolve, 10));
// Verify we're streaming
expect(session.isStreaming).toBe(true);
// Second prompt should reject
await expect(session.prompt("Second message")).rejects.toThrow(
"Agent is already processing. Specify streamingBehavior ('steer' or 'followUp') to queue the message.",
);
// Cleanup
await session.abort();
await firstPrompt.catch(() => {}); // Ignore abort error
});
it("should allow steer() while streaming", async () => {
createSession();
// Start first prompt
const firstPrompt = session.prompt("First message");
await new Promise((resolve) => setTimeout(resolve, 10));
// steer should work while streaming
expect(() => session.steer("Steering message")).not.toThrow();
expect(session.pendingMessageCount).toBe(1);
// Cleanup
await session.abort();
await firstPrompt.catch(() => {});
});
it("should allow followUp() while streaming", async () => {
createSession();
// Start first prompt
const firstPrompt = session.prompt("First message");
await new Promise((resolve) => setTimeout(resolve, 10));
// followUp should work while streaming
expect(() => session.followUp("Follow-up message")).not.toThrow();
expect(session.pendingMessageCount).toBe(1);
// Cleanup
await session.abort();
await firstPrompt.catch(() => {});
});
it("should allow prompt() after previous completes", async () => {
// Create session with a stream that completes immediately
const model = getModel("anthropic", "claude-sonnet-4-5")!;
const agent = new Agent({
getApiKey: () => "test-key",
initialState: {
model,
systemPrompt: "Test",
tools: [],
},
streamFn: () => {
const stream = new MockAssistantStream();
queueMicrotask(() => {
stream.push({ type: "start", partial: createAssistantMessage("") });
stream.push({ type: "done", reason: "stop", message: createAssistantMessage("Done") });
});
return stream;
},
});
const sessionManager = SessionManager.inMemory();
const settingsManager = SettingsManager.create(tempDir, tempDir);
const authStorage = new AuthStorage(join(tempDir, "auth.json"));
const modelRegistry = new ModelRegistry(authStorage, tempDir);
authStorage.setRuntimeApiKey("anthropic", "test-key");
session = new AgentSession({
agent,
sessionManager,
settingsManager,
cwd: tempDir,
modelRegistry,
resourceLoader: createTestResourceLoader(),
});
// First prompt completes
await session.prompt("First message");
// Should not be streaming anymore
expect(session.isStreaming).toBe(false);
// Second prompt should work
await expect(session.prompt("Second message")).resolves.not.toThrow();
});
});