mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-15 08:03:39 +00:00
- Add ResourceLoader interface and DefaultResourceLoader implementation - Add PackageManager for npm/git extension sources with install/remove/update - Add session.reload() and session.bindExtensions() APIs - Add /reload command in interactive mode - Add CLI flags: --skill, --theme, --prompt-template, --no-themes, --no-prompt-templates - Add pi install/remove/update commands for extension management - Refactor settings.json to use arrays for skills, prompts, themes - Remove legacy SkillsSettings source flags and filters - Update SDK examples and documentation for ResourceLoader pattern - Add theme registration and loadThemeFromPath for dynamic themes - Add getShellEnv to include bin dir in PATH for bash commands
217 lines
6.3 KiB
TypeScript
217 lines
6.3 KiB
TypeScript
/**
|
|
* Tests for AgentSession concurrent prompt guard.
|
|
*/
|
|
|
|
import { existsSync, mkdirSync, rmSync } from "node:fs";
|
|
import { tmpdir } from "node:os";
|
|
import { join } from "node:path";
|
|
import { Agent } from "@mariozechner/pi-agent-core";
|
|
import { type AssistantMessage, type AssistantMessageEvent, EventStream, getModel } from "@mariozechner/pi-ai";
|
|
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
|
import { AgentSession } from "../src/core/agent-session.js";
|
|
import { AuthStorage } from "../src/core/auth-storage.js";
|
|
import { ModelRegistry } from "../src/core/model-registry.js";
|
|
import { SessionManager } from "../src/core/session-manager.js";
|
|
import { SettingsManager } from "../src/core/settings-manager.js";
|
|
import { createTestResourceLoader } from "./utilities.js";
|
|
|
|
// Mock stream that mimics AssistantMessageEventStream
|
|
class MockAssistantStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
|
|
constructor() {
|
|
super(
|
|
(event) => event.type === "done" || event.type === "error",
|
|
(event) => {
|
|
if (event.type === "done") return event.message;
|
|
if (event.type === "error") return event.error;
|
|
throw new Error("Unexpected event type");
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
function createAssistantMessage(text: string): AssistantMessage {
|
|
return {
|
|
role: "assistant",
|
|
content: [{ type: "text", text }],
|
|
api: "anthropic-messages",
|
|
provider: "anthropic",
|
|
model: "mock",
|
|
usage: {
|
|
input: 0,
|
|
output: 0,
|
|
cacheRead: 0,
|
|
cacheWrite: 0,
|
|
totalTokens: 0,
|
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
|
},
|
|
stopReason: "stop",
|
|
timestamp: Date.now(),
|
|
};
|
|
}
|
|
|
|
describe("AgentSession concurrent prompt guard", () => {
|
|
let session: AgentSession;
|
|
let tempDir: string;
|
|
|
|
beforeEach(() => {
|
|
tempDir = join(tmpdir(), `pi-concurrent-test-${Date.now()}`);
|
|
mkdirSync(tempDir, { recursive: true });
|
|
});
|
|
|
|
afterEach(async () => {
|
|
if (session) {
|
|
session.dispose();
|
|
}
|
|
if (tempDir && existsSync(tempDir)) {
|
|
rmSync(tempDir, { recursive: true });
|
|
}
|
|
});
|
|
|
|
function createSession() {
|
|
const model = getModel("anthropic", "claude-sonnet-4-5")!;
|
|
let abortSignal: AbortSignal | undefined;
|
|
|
|
// Use a stream function that responds to abort
|
|
const agent = new Agent({
|
|
getApiKey: () => "test-key",
|
|
initialState: {
|
|
model,
|
|
systemPrompt: "Test",
|
|
tools: [],
|
|
},
|
|
streamFn: (_model, _context, options) => {
|
|
abortSignal = options?.signal;
|
|
const stream = new MockAssistantStream();
|
|
queueMicrotask(() => {
|
|
stream.push({ type: "start", partial: createAssistantMessage("") });
|
|
const checkAbort = () => {
|
|
if (abortSignal?.aborted) {
|
|
stream.push({ type: "error", reason: "aborted", error: createAssistantMessage("Aborted") });
|
|
} else {
|
|
setTimeout(checkAbort, 5);
|
|
}
|
|
};
|
|
checkAbort();
|
|
});
|
|
return stream;
|
|
},
|
|
});
|
|
|
|
const sessionManager = SessionManager.inMemory();
|
|
const settingsManager = SettingsManager.create(tempDir, tempDir);
|
|
const authStorage = new AuthStorage(join(tempDir, "auth.json"));
|
|
const modelRegistry = new ModelRegistry(authStorage, tempDir);
|
|
// Set a runtime API key so validation passes
|
|
authStorage.setRuntimeApiKey("anthropic", "test-key");
|
|
|
|
session = new AgentSession({
|
|
agent,
|
|
sessionManager,
|
|
settingsManager,
|
|
cwd: tempDir,
|
|
modelRegistry,
|
|
resourceLoader: createTestResourceLoader(),
|
|
});
|
|
|
|
return session;
|
|
}
|
|
|
|
it("should throw when prompt() called while streaming", async () => {
|
|
createSession();
|
|
|
|
// Start first prompt (don't await, it will block until abort)
|
|
const firstPrompt = session.prompt("First message");
|
|
|
|
// Wait a tick for isStreaming to be set
|
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
|
|
|
// Verify we're streaming
|
|
expect(session.isStreaming).toBe(true);
|
|
|
|
// Second prompt should reject
|
|
await expect(session.prompt("Second message")).rejects.toThrow(
|
|
"Agent is already processing. Specify streamingBehavior ('steer' or 'followUp') to queue the message.",
|
|
);
|
|
|
|
// Cleanup
|
|
await session.abort();
|
|
await firstPrompt.catch(() => {}); // Ignore abort error
|
|
});
|
|
|
|
it("should allow steer() while streaming", async () => {
|
|
createSession();
|
|
|
|
// Start first prompt
|
|
const firstPrompt = session.prompt("First message");
|
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
|
|
|
// steer should work while streaming
|
|
expect(() => session.steer("Steering message")).not.toThrow();
|
|
expect(session.pendingMessageCount).toBe(1);
|
|
|
|
// Cleanup
|
|
await session.abort();
|
|
await firstPrompt.catch(() => {});
|
|
});
|
|
|
|
it("should allow followUp() while streaming", async () => {
|
|
createSession();
|
|
|
|
// Start first prompt
|
|
const firstPrompt = session.prompt("First message");
|
|
await new Promise((resolve) => setTimeout(resolve, 10));
|
|
|
|
// followUp should work while streaming
|
|
expect(() => session.followUp("Follow-up message")).not.toThrow();
|
|
expect(session.pendingMessageCount).toBe(1);
|
|
|
|
// Cleanup
|
|
await session.abort();
|
|
await firstPrompt.catch(() => {});
|
|
});
|
|
|
|
it("should allow prompt() after previous completes", async () => {
|
|
// Create session with a stream that completes immediately
|
|
const model = getModel("anthropic", "claude-sonnet-4-5")!;
|
|
const agent = new Agent({
|
|
getApiKey: () => "test-key",
|
|
initialState: {
|
|
model,
|
|
systemPrompt: "Test",
|
|
tools: [],
|
|
},
|
|
streamFn: () => {
|
|
const stream = new MockAssistantStream();
|
|
queueMicrotask(() => {
|
|
stream.push({ type: "start", partial: createAssistantMessage("") });
|
|
stream.push({ type: "done", reason: "stop", message: createAssistantMessage("Done") });
|
|
});
|
|
return stream;
|
|
},
|
|
});
|
|
|
|
const sessionManager = SessionManager.inMemory();
|
|
const settingsManager = SettingsManager.create(tempDir, tempDir);
|
|
const authStorage = new AuthStorage(join(tempDir, "auth.json"));
|
|
const modelRegistry = new ModelRegistry(authStorage, tempDir);
|
|
authStorage.setRuntimeApiKey("anthropic", "test-key");
|
|
|
|
session = new AgentSession({
|
|
agent,
|
|
sessionManager,
|
|
settingsManager,
|
|
cwd: tempDir,
|
|
modelRegistry,
|
|
resourceLoader: createTestResourceLoader(),
|
|
});
|
|
|
|
// First prompt completes
|
|
await session.prompt("First message");
|
|
|
|
// Should not be streaming anymore
|
|
expect(session.isStreaming).toBe(false);
|
|
|
|
// Second prompt should work
|
|
await expect(session.prompt("Second message")).resolves.not.toThrow();
|
|
});
|
|
});
|