sandbox-agent/factory/packages/client/test/e2e/workbench-load-e2e.test.ts
2026-03-09 14:00:20 -07:00

331 lines
11 KiB
TypeScript

import { describe, expect, it } from "vitest";
import type {
HandoffWorkbenchSnapshot,
WorkbenchAgentTab,
WorkbenchHandoff,
WorkbenchModelId,
WorkbenchTranscriptEvent,
} from "@openhandoff/shared";
import { createBackendClient } from "../../src/backend-client.js";
const RUN_WORKBENCH_LOAD_E2E = process.env.HF_ENABLE_DAEMON_WORKBENCH_LOAD_E2E === "1";
function requiredEnv(name: string): string {
const value = process.env[name]?.trim();
if (!value) {
throw new Error(`Missing required env var: ${name}`);
}
return value;
}
function workbenchModelEnv(name: string, fallback: WorkbenchModelId): WorkbenchModelId {
const value = process.env[name]?.trim();
switch (value) {
case "claude-sonnet-4":
case "claude-opus-4":
case "gpt-4o":
case "o3":
return value;
default:
return fallback;
}
}
function intEnv(name: string, fallback: number): number {
const raw = process.env[name]?.trim();
if (!raw) {
return fallback;
}
const value = Number.parseInt(raw, 10);
return Number.isFinite(value) && value > 0 ? value : fallback;
}
async function sleep(ms: number): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, ms));
}
async function poll<T>(
label: string,
timeoutMs: number,
intervalMs: number,
fn: () => Promise<T>,
isDone: (value: T) => boolean,
): Promise<T> {
const startedAt = Date.now();
let lastValue: T;
for (;;) {
lastValue = await fn();
if (isDone(lastValue)) {
return lastValue;
}
if (Date.now() - startedAt > timeoutMs) {
throw new Error(`timed out waiting for ${label}`);
}
await sleep(intervalMs);
}
}
function findHandoff(snapshot: HandoffWorkbenchSnapshot, handoffId: string): WorkbenchHandoff {
const handoff = snapshot.handoffs.find((candidate) => candidate.id === handoffId);
if (!handoff) {
throw new Error(`handoff ${handoffId} missing from snapshot`);
}
return handoff;
}
function findTab(handoff: WorkbenchHandoff, tabId: string): WorkbenchAgentTab {
const tab = handoff.tabs.find((candidate) => candidate.id === tabId);
if (!tab) {
throw new Error(`tab ${tabId} missing from handoff ${handoff.id}`);
}
return tab;
}
function extractEventText(event: WorkbenchTranscriptEvent): string {
const payload = event.payload;
if (!payload || typeof payload !== "object") {
return String(payload ?? "");
}
const envelope = payload as {
method?: unknown;
params?: unknown;
result?: unknown;
};
const params = envelope.params;
if (params && typeof params === "object") {
const update = (params as { update?: unknown }).update;
if (update && typeof update === "object") {
const content = (update as { content?: unknown }).content;
if (content && typeof content === "object") {
const chunkText = (content as { text?: unknown }).text;
if (typeof chunkText === "string") {
return chunkText;
}
}
}
const text = (params as { text?: unknown }).text;
if (typeof text === "string" && text.trim()) {
return text.trim();
}
const prompt = (params as { prompt?: Array<{ text?: unknown }> }).prompt;
if (Array.isArray(prompt)) {
return prompt
.map((item) => (typeof item?.text === "string" ? item.text.trim() : ""))
.filter(Boolean)
.join("\n");
}
}
const result = envelope.result;
if (result && typeof result === "object") {
const text = (result as { text?: unknown }).text;
if (typeof text === "string" && text.trim()) {
return text.trim();
}
}
return typeof envelope.method === "string" ? envelope.method : JSON.stringify(payload);
}
function transcriptIncludesAgentText(transcript: WorkbenchTranscriptEvent[], expectedText: string): boolean {
return transcript
.filter((event) => event.sender === "agent")
.map((event) => extractEventText(event))
.join("")
.includes(expectedText);
}
function average(values: number[]): number {
return values.reduce((sum, value) => sum + value, 0) / Math.max(values.length, 1);
}
async function measureWorkbenchSnapshot(
client: ReturnType<typeof createBackendClient>,
workspaceId: string,
iterations: number,
): Promise<{
avgMs: number;
maxMs: number;
payloadBytes: number;
handoffCount: number;
tabCount: number;
transcriptEventCount: number;
}> {
const durations: number[] = [];
let snapshot: HandoffWorkbenchSnapshot | null = null;
for (let index = 0; index < iterations; index += 1) {
const startedAt = performance.now();
snapshot = await client.getWorkbench(workspaceId);
durations.push(performance.now() - startedAt);
}
const finalSnapshot = snapshot ?? {
workspaceId,
repos: [],
projects: [],
handoffs: [],
};
const payloadBytes = Buffer.byteLength(JSON.stringify(finalSnapshot), "utf8");
const tabCount = finalSnapshot.handoffs.reduce((sum, handoff) => sum + handoff.tabs.length, 0);
const transcriptEventCount = finalSnapshot.handoffs.reduce(
(sum, handoff) =>
sum + handoff.tabs.reduce((tabSum, tab) => tabSum + tab.transcript.length, 0),
0,
);
return {
avgMs: Math.round(average(durations)),
maxMs: Math.round(Math.max(...durations, 0)),
payloadBytes,
handoffCount: finalSnapshot.handoffs.length,
tabCount,
transcriptEventCount,
};
}
describe("e2e(client): workbench load", () => {
it.skipIf(!RUN_WORKBENCH_LOAD_E2E)(
"runs a simple sequential load profile against the real backend",
{ timeout: 30 * 60_000 },
async () => {
const endpoint = process.env.HF_E2E_BACKEND_ENDPOINT?.trim() || "http://127.0.0.1:7741/api/rivet";
const workspaceId = process.env.HF_E2E_WORKSPACE?.trim() || "default";
const repoRemote = requiredEnv("HF_E2E_GITHUB_REPO");
const model = workbenchModelEnv("HF_E2E_MODEL", "gpt-4o");
const handoffCount = intEnv("HF_LOAD_HANDOFF_COUNT", 3);
const extraSessionCount = intEnv("HF_LOAD_EXTRA_SESSION_COUNT", 2);
const pollIntervalMs = intEnv("HF_LOAD_POLL_INTERVAL_MS", 2_000);
const client = createBackendClient({
endpoint,
defaultWorkspaceId: workspaceId,
});
const repo = await client.addRepo(workspaceId, repoRemote);
const createHandoffLatencies: number[] = [];
const provisionLatencies: number[] = [];
const createSessionLatencies: number[] = [];
const messageRoundTripLatencies: number[] = [];
const snapshotSeries: Array<{
handoffCount: number;
avgMs: number;
maxMs: number;
payloadBytes: number;
tabCount: number;
transcriptEventCount: number;
}> = [];
snapshotSeries.push(await measureWorkbenchSnapshot(client, workspaceId, 2));
for (let handoffIndex = 0; handoffIndex < handoffCount; handoffIndex += 1) {
const runId = `load-${handoffIndex}-${Date.now().toString(36)}`;
const initialReply = `LOAD_INIT_${runId}`;
const createStartedAt = performance.now();
const created = await client.createWorkbenchHandoff(workspaceId, {
repoId: repo.repoId,
title: `Workbench Load ${runId}`,
branch: `load/${runId}`,
model,
task: `Reply with exactly: ${initialReply}`,
});
createHandoffLatencies.push(performance.now() - createStartedAt);
const provisionStartedAt = performance.now();
const provisioned = await poll(
`handoff ${runId} provisioning`,
12 * 60_000,
pollIntervalMs,
async () => findHandoff(await client.getWorkbench(workspaceId), created.handoffId),
(handoff) => {
const tab = handoff.tabs[0];
return Boolean(
tab &&
handoff.status === "idle" &&
tab.status === "idle" &&
transcriptIncludesAgentText(tab.transcript, initialReply),
);
},
);
provisionLatencies.push(performance.now() - provisionStartedAt);
expect(provisioned.tabs.length).toBeGreaterThan(0);
const primaryTab = provisioned.tabs[0]!;
expect(transcriptIncludesAgentText(primaryTab.transcript, initialReply)).toBe(true);
for (let sessionIndex = 0; sessionIndex < extraSessionCount; sessionIndex += 1) {
const expectedReply = `LOAD_REPLY_${runId}_${sessionIndex}`;
const createSessionStartedAt = performance.now();
const createdSession = await client.createWorkbenchSession(workspaceId, {
handoffId: created.handoffId,
model,
});
createSessionLatencies.push(performance.now() - createSessionStartedAt);
await client.sendWorkbenchMessage(workspaceId, {
handoffId: created.handoffId,
tabId: createdSession.tabId,
text: `Run pwd in the repo, then reply with exactly: ${expectedReply}`,
attachments: [],
});
const messageStartedAt = performance.now();
const withReply = await poll(
`handoff ${runId} session ${sessionIndex} reply`,
10 * 60_000,
pollIntervalMs,
async () => findHandoff(await client.getWorkbench(workspaceId), created.handoffId),
(handoff) => {
const tab = findTab(handoff, createdSession.tabId);
return tab.status === "idle" && transcriptIncludesAgentText(tab.transcript, expectedReply);
},
);
messageRoundTripLatencies.push(performance.now() - messageStartedAt);
expect(transcriptIncludesAgentText(findTab(withReply, createdSession.tabId).transcript, expectedReply)).toBe(true);
}
const snapshotMetrics = await measureWorkbenchSnapshot(client, workspaceId, 3);
snapshotSeries.push(snapshotMetrics);
console.info(
"[workbench-load-snapshot]",
JSON.stringify({
handoffIndex: handoffIndex + 1,
...snapshotMetrics,
}),
);
}
const firstSnapshot = snapshotSeries[0]!;
const lastSnapshot = snapshotSeries[snapshotSeries.length - 1]!;
const summary = {
handoffCount,
extraSessionCount,
createHandoffAvgMs: Math.round(average(createHandoffLatencies)),
provisionAvgMs: Math.round(average(provisionLatencies)),
createSessionAvgMs: Math.round(average(createSessionLatencies)),
messageRoundTripAvgMs: Math.round(average(messageRoundTripLatencies)),
snapshotReadBaselineAvgMs: firstSnapshot.avgMs,
snapshotReadFinalAvgMs: lastSnapshot.avgMs,
snapshotReadFinalMaxMs: lastSnapshot.maxMs,
snapshotPayloadBaselineBytes: firstSnapshot.payloadBytes,
snapshotPayloadFinalBytes: lastSnapshot.payloadBytes,
snapshotTabFinalCount: lastSnapshot.tabCount,
snapshotTranscriptFinalCount: lastSnapshot.transcriptEventCount,
};
console.info("[workbench-load-summary]", JSON.stringify(summary));
expect(createHandoffLatencies.length).toBe(handoffCount);
expect(provisionLatencies.length).toBe(handoffCount);
expect(createSessionLatencies.length).toBe(handoffCount * extraSessionCount);
expect(messageRoundTripLatencies.length).toBe(handoffCount * extraSessionCount);
},
);
});