This commit is contained in:
Nathan Flurry 2026-02-09 18:53:00 -08:00
parent a33b1323ff
commit 2ba630c180
264 changed files with 18559 additions and 51021 deletions

View file

@ -0,0 +1,37 @@
{
"name": "acp-http-client",
"version": "0.1.0",
"description": "Protocol-faithful ACP JSON-RPC over streamable HTTP client.",
"license": "Apache-2.0",
"repository": {
"type": "git",
"url": "https://github.com/rivet-dev/sandbox-agent"
},
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
}
},
"dependencies": {
"@agentclientprotocol/sdk": "^0.14.1"
},
"files": [
"dist"
],
"scripts": {
"build": "tsup",
"typecheck": "tsc --noEmit",
"test": "vitest run",
"test:watch": "vitest"
},
"devDependencies": {
"@types/node": "^22.0.0",
"tsup": "^8.0.0",
"typescript": "^5.7.0",
"vitest": "^3.0.0"
}
}

View file

@ -0,0 +1,574 @@
import {
ClientSideConnection,
PROTOCOL_VERSION,
type AnyMessage,
type AuthenticateRequest,
type AuthenticateResponse,
type CancelNotification,
type Client,
type ForkSessionRequest,
type ForkSessionResponse,
type InitializeRequest,
type InitializeResponse,
type ListSessionsRequest,
type ListSessionsResponse,
type LoadSessionRequest,
type LoadSessionResponse,
type NewSessionRequest,
type NewSessionResponse,
type PromptRequest,
type PromptResponse,
type RequestPermissionOutcome,
type RequestPermissionRequest,
type RequestPermissionResponse,
type ResumeSessionRequest,
type ResumeSessionResponse,
type SessionNotification,
type SetSessionConfigOptionRequest,
type SetSessionConfigOptionResponse,
type SetSessionModelRequest,
type SetSessionModelResponse,
type SetSessionModeRequest,
type SetSessionModeResponse,
type Stream,
} from "@agentclientprotocol/sdk";
const ACP_PATH = "/v2/rpc";
export interface ProblemDetails {
type: string;
title: string;
status: number;
detail?: string;
instance?: string;
[key: string]: unknown;
}
export type AcpEnvelopeDirection = "inbound" | "outbound";
export type AcpEnvelopeObserver = (envelope: AnyMessage, direction: AcpEnvelopeDirection) => void;
export interface AcpHttpClientOptions {
baseUrl: string;
token?: string;
fetch?: typeof fetch;
headers?: HeadersInit;
client?: Partial<Client>;
onEnvelope?: AcpEnvelopeObserver;
}
export class AcpHttpError extends Error {
readonly status: number;
readonly problem?: ProblemDetails;
readonly response: Response;
constructor(status: number, problem: ProblemDetails | undefined, response: Response) {
super(problem?.title ?? `Request failed with status ${status}`);
this.name = "AcpHttpError";
this.status = status;
this.problem = problem;
this.response = response;
}
}
export class AcpHttpClient {
private readonly transport: StreamableHttpAcpTransport;
private readonly connection: ClientSideConnection;
constructor(options: AcpHttpClientOptions) {
const fetcher = options.fetch ?? globalThis.fetch?.bind(globalThis);
if (!fetcher) {
throw new Error("Fetch API is not available; provide a fetch implementation.");
}
this.transport = new StreamableHttpAcpTransport({
baseUrl: options.baseUrl,
fetcher,
token: options.token,
defaultHeaders: options.headers,
onEnvelope: options.onEnvelope,
});
const clientHandlers = buildClientHandlers(options.client);
this.connection = new ClientSideConnection(() => clientHandlers, this.transport.stream);
}
get clientId(): string | undefined {
return this.transport.clientId ?? undefined;
}
async initialize(request: Partial<InitializeRequest> = {}): Promise<InitializeResponse> {
const params: InitializeRequest = {
protocolVersion: request.protocolVersion ?? PROTOCOL_VERSION,
clientCapabilities: request.clientCapabilities,
clientInfo: request.clientInfo ?? {
name: "acp-http-client",
version: "v2",
},
};
if (request._meta !== undefined) {
params._meta = request._meta;
}
return this.connection.initialize(params);
}
async authenticate(request: AuthenticateRequest): Promise<AuthenticateResponse> {
return this.connection.authenticate(request);
}
async newSession(request: NewSessionRequest): Promise<NewSessionResponse> {
return this.connection.newSession(request);
}
async loadSession(request: LoadSessionRequest): Promise<LoadSessionResponse> {
return this.connection.loadSession(request);
}
async prompt(request: PromptRequest): Promise<PromptResponse> {
return this.connection.prompt(request);
}
async cancel(notification: CancelNotification): Promise<void> {
return this.connection.cancel(notification);
}
async setSessionMode(request: SetSessionModeRequest): Promise<SetSessionModeResponse | void> {
return this.connection.setSessionMode(request);
}
async setSessionConfigOption(
request: SetSessionConfigOptionRequest,
): Promise<SetSessionConfigOptionResponse> {
return this.connection.setSessionConfigOption(request);
}
async unstableListSessions(request: ListSessionsRequest): Promise<ListSessionsResponse> {
return this.connection.unstable_listSessions(request);
}
async unstableForkSession(request: ForkSessionRequest): Promise<ForkSessionResponse> {
return this.connection.unstable_forkSession(request);
}
async unstableResumeSession(request: ResumeSessionRequest): Promise<ResumeSessionResponse> {
return this.connection.unstable_resumeSession(request);
}
async unstableSetSessionModel(
request: SetSessionModelRequest,
): Promise<SetSessionModelResponse | void> {
return this.connection.unstable_setSessionModel(request);
}
async extMethod(method: string, params: Record<string, unknown>): Promise<Record<string, unknown>> {
return this.connection.extMethod(method, params);
}
async extNotification(method: string, params: Record<string, unknown>): Promise<void> {
return this.connection.extNotification(method, params);
}
async disconnect(): Promise<void> {
await this.transport.close();
}
get closed(): Promise<void> {
return this.connection.closed;
}
get signal(): AbortSignal {
return this.connection.signal;
}
get clientSideConnection(): ClientSideConnection {
return this.connection;
}
}
type StreamableHttpAcpTransportOptions = {
baseUrl: string;
fetcher: typeof fetch;
token?: string;
defaultHeaders?: HeadersInit;
onEnvelope?: AcpEnvelopeObserver;
};
class StreamableHttpAcpTransport {
readonly stream: Stream;
private readonly baseUrl: string;
private readonly fetcher: typeof fetch;
private readonly token?: string;
private readonly defaultHeaders?: HeadersInit;
private readonly onEnvelope?: AcpEnvelopeObserver;
private readableController: ReadableStreamDefaultController<AnyMessage> | null = null;
private sseAbortController: AbortController | null = null;
private sseLoop: Promise<void> | null = null;
private lastEventId: string | null = null;
private closed = false;
private closingPromise: Promise<void> | null = null;
private _clientId: string | null = null;
constructor(options: StreamableHttpAcpTransportOptions) {
this.baseUrl = options.baseUrl.replace(/\/$/, "");
this.fetcher = options.fetcher;
this.token = options.token;
this.defaultHeaders = options.defaultHeaders;
this.onEnvelope = options.onEnvelope;
this.stream = {
readable: new ReadableStream<AnyMessage>({
start: (controller) => {
this.readableController = controller;
},
cancel: async () => {
await this.close();
},
}),
writable: new WritableStream<AnyMessage>({
write: async (message) => {
await this.writeMessage(message);
},
close: async () => {
await this.close();
},
abort: async () => {
await this.close();
},
}),
};
}
get clientId(): string | null {
return this._clientId;
}
async close(): Promise<void> {
if (this.closingPromise) {
return this.closingPromise;
}
this.closingPromise = this.closeImpl();
return this.closingPromise;
}
private async closeImpl(): Promise<void> {
if (this.closed) {
return;
}
this.closed = true;
if (this.sseAbortController) {
this.sseAbortController.abort();
}
const clientId = this._clientId;
if (clientId) {
try {
const response = await this.fetcher(`${this.baseUrl}${ACP_PATH}`, {
method: "DELETE",
headers: this.buildHeaders({
"x-acp-connection-id": clientId,
Accept: "application/json",
}),
});
if (!response.ok && response.status !== 404) {
throw new AcpHttpError(response.status, await readProblem(response), response);
}
} catch {
// Ignore close errors; close must be best effort.
}
}
try {
this.readableController?.close();
} catch {
// no-op
}
this.readableController = null;
}
private async writeMessage(message: AnyMessage): Promise<void> {
if (this.closed) {
throw new Error("ACP client is closed");
}
this.observeEnvelope(message, "outbound");
const headers = this.buildHeaders({
"Content-Type": "application/json",
Accept: "application/json",
});
if (this._clientId) {
headers.set("x-acp-connection-id", this._clientId);
}
const response = await this.fetcher(`${this.baseUrl}${ACP_PATH}`, {
method: "POST",
headers,
body: JSON.stringify(message),
});
if (!response.ok) {
throw new AcpHttpError(response.status, await readProblem(response), response);
}
const responseClientId = response.headers.get("x-acp-connection-id");
if (responseClientId && responseClientId !== this._clientId) {
this._clientId = responseClientId;
this.ensureSseLoop();
}
if (response.status === 200) {
const text = await response.text();
if (text.trim()) {
const envelope = JSON.parse(text) as AnyMessage;
this.pushInbound(envelope);
}
}
}
private ensureSseLoop(): void {
if (this.sseLoop || this.closed || !this._clientId) {
return;
}
this.sseLoop = this.runSseLoop().finally(() => {
this.sseLoop = null;
});
}
private async runSseLoop(): Promise<void> {
while (!this.closed && this._clientId) {
this.sseAbortController = new AbortController();
const headers = this.buildHeaders({
"x-acp-connection-id": this._clientId,
Accept: "text/event-stream",
});
if (this.lastEventId) {
headers.set("Last-Event-ID", this.lastEventId);
}
try {
const response = await this.fetcher(`${this.baseUrl}${ACP_PATH}`, {
method: "GET",
headers,
signal: this.sseAbortController.signal,
});
if (!response.ok) {
throw new AcpHttpError(response.status, await readProblem(response), response);
}
if (!response.body) {
throw new Error("SSE stream is not readable in this environment.");
}
await this.consumeSse(response.body);
if (!this.closed) {
await delay(150);
}
} catch (error) {
if (this.closed || isAbortError(error)) {
return;
}
this.failReadable(error);
return;
}
}
}
private async consumeSse(body: ReadableStream<Uint8Array>): Promise<void> {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (!this.closed) {
const { done, value } = await reader.read();
if (done) {
return;
}
buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n");
let separatorIndex = buffer.indexOf("\n\n");
while (separatorIndex !== -1) {
const eventChunk = buffer.slice(0, separatorIndex);
buffer = buffer.slice(separatorIndex + 2);
this.processSseEvent(eventChunk);
separatorIndex = buffer.indexOf("\n\n");
}
}
} finally {
reader.releaseLock();
}
}
private processSseEvent(chunk: string): void {
if (!chunk.trim()) {
return;
}
let eventName = "message";
let eventId: string | null = null;
const dataLines: string[] = [];
for (const line of chunk.split("\n")) {
if (!line || line.startsWith(":")) {
continue;
}
if (line.startsWith("event:")) {
eventName = line.slice(6).trim();
continue;
}
if (line.startsWith("id:")) {
eventId = line.slice(3).trim();
continue;
}
if (line.startsWith("data:")) {
dataLines.push(line.slice(5).trimStart());
}
}
if (eventId) {
this.lastEventId = eventId;
}
if (eventName !== "message" || dataLines.length === 0) {
return;
}
const payloadText = dataLines.join("\n");
if (!payloadText.trim()) {
return;
}
const envelope = JSON.parse(payloadText) as AnyMessage;
this.pushInbound(envelope);
}
private pushInbound(envelope: AnyMessage): void {
if (this.closed) {
return;
}
this.observeEnvelope(envelope, "inbound");
try {
this.readableController?.enqueue(envelope);
} catch (error) {
this.failReadable(error);
}
}
private failReadable(error: unknown): void {
if (this.closed) {
return;
}
this.closed = true;
try {
this.readableController?.error(error);
} catch {
// no-op
}
this.readableController = null;
if (this.sseAbortController) {
this.sseAbortController.abort();
}
}
private observeEnvelope(message: AnyMessage, direction: AcpEnvelopeDirection): void {
if (!this.onEnvelope) {
return;
}
this.onEnvelope(message, direction);
}
private buildHeaders(extra?: HeadersInit): Headers {
const headers = new Headers(this.defaultHeaders ?? undefined);
if (this.token) {
headers.set("Authorization", `Bearer ${this.token}`);
}
if (extra) {
const merged = new Headers(extra);
merged.forEach((value, key) => headers.set(key, value));
}
return headers;
}
}
function buildClientHandlers(client?: Partial<Client>): Client {
const fallbackPermission: RequestPermissionResponse = {
outcome: {
outcome: "cancelled",
} as RequestPermissionOutcome,
};
return {
requestPermission: async (request: RequestPermissionRequest) => {
if (client?.requestPermission) {
return client.requestPermission(request);
}
return fallbackPermission;
},
sessionUpdate: async (notification: SessionNotification) => {
if (client?.sessionUpdate) {
await client.sessionUpdate(notification);
}
},
readTextFile: client?.readTextFile,
writeTextFile: client?.writeTextFile,
createTerminal: client?.createTerminal,
terminalOutput: client?.terminalOutput,
releaseTerminal: client?.releaseTerminal,
waitForTerminalExit: client?.waitForTerminalExit,
killTerminal: client?.killTerminal,
extMethod: client?.extMethod,
extNotification: client?.extNotification,
};
}
async function readProblem(response: Response): Promise<ProblemDetails | undefined> {
try {
const text = await response.clone().text();
if (!text) {
return undefined;
}
return JSON.parse(text) as ProblemDetails;
} catch {
return undefined;
}
}
function isAbortError(error: unknown): boolean {
return error instanceof DOMException && error.name === "AbortError";
}
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export type * from "@agentclientprotocol/sdk";

View file

@ -0,0 +1,135 @@
import { describe, expect, it, beforeAll, afterAll } from "vitest";
import { existsSync } from "node:fs";
import { dirname, resolve } from "node:path";
import { fileURLToPath } from "node:url";
import { AcpHttpClient, type SessionNotification } from "../src/index.ts";
import { spawnSandboxAgent, type SandboxAgentSpawnHandle } from "../../typescript/src/spawn.ts";
const __dirname = dirname(fileURLToPath(import.meta.url));
function findBinary(): string | null {
if (process.env.SANDBOX_AGENT_BIN) {
return process.env.SANDBOX_AGENT_BIN;
}
const cargoPaths = [
resolve(__dirname, "../../../target/debug/sandbox-agent"),
resolve(__dirname, "../../../target/release/sandbox-agent"),
];
for (const p of cargoPaths) {
if (existsSync(p)) {
return p;
}
}
return null;
}
const BINARY_PATH = findBinary();
if (!BINARY_PATH) {
throw new Error(
"sandbox-agent binary not found. Build it (cargo build -p sandbox-agent) or set SANDBOX_AGENT_BIN.",
);
}
if (!process.env.SANDBOX_AGENT_BIN) {
process.env.SANDBOX_AGENT_BIN = BINARY_PATH;
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function waitFor<T>(
fn: () => T | undefined | null,
timeoutMs = 5000,
stepMs = 25,
): Promise<T> {
const started = Date.now();
while (Date.now() - started < timeoutMs) {
const value = fn();
if (value !== undefined && value !== null) {
return value;
}
await sleep(stepMs);
}
throw new Error("timed out waiting for condition");
}
describe("AcpHttpClient integration", () => {
let handle: SandboxAgentSpawnHandle;
let baseUrl: string;
let token: string;
beforeAll(async () => {
handle = await spawnSandboxAgent({
enabled: true,
log: "silent",
timeoutMs: 30000,
});
baseUrl = handle.baseUrl;
token = handle.token;
});
afterAll(async () => {
await handle.dispose();
});
it("runs initialize/newSession/prompt against real /v2/rpc", async () => {
const updates: SessionNotification[] = [];
const client = new AcpHttpClient({
baseUrl,
token,
client: {
sessionUpdate: async (notification) => {
updates.push(notification);
},
},
});
const initialize = await client.initialize({
_meta: {
"sandboxagent.dev": {
agent: "mock",
},
},
});
expect(initialize.protocolVersion).toBeTruthy();
const session = await client.newSession({
cwd: process.cwd(),
mcpServers: [],
_meta: {
"sandboxagent.dev": {
agent: "mock",
},
},
});
expect(session.sessionId).toBeTruthy();
const prompt = await client.prompt({
sessionId: session.sessionId,
prompt: [{ type: "text", text: "acp package integration" }],
});
expect(prompt.stopReason).toBe("end_turn");
await waitFor(() => {
const text = updates
.flatMap((entry) => {
if (entry.update.sessionUpdate !== "agent_message_chunk") {
return [];
}
const content = entry.update.content;
if (content.type !== "text") {
return [];
}
return [content.text];
})
.join("");
return text.includes("mock: acp package integration") ? text : undefined;
});
await client.disconnect();
});
});

View file

@ -0,0 +1,16 @@
{
"compilerOptions": {
"target": "ES2022",
"lib": ["ES2022", "DOM"],
"module": "ESNext",
"moduleResolution": "Bundler",
"allowImportingTsExtensions": true,
"noEmit": true,
"esModuleInterop": true,
"strict": true,
"skipLibCheck": true,
"resolveJsonModule": true
},
"include": ["src/**/*", "tests/**/*"],
"exclude": ["node_modules", "dist"]
}

View file

@ -0,0 +1,9 @@
import { defineConfig } from "tsup";
export default defineConfig({
entry: ["src/index.ts"],
format: ["esm"],
dts: true,
clean: true,
sourcemap: true,
});

View file

@ -0,0 +1,8 @@
import { defineConfig } from "vitest/config";
export default defineConfig({
test: {
include: ["tests/**/*.test.ts"],
testTimeout: 30000,
},
});