Add abort signal to connect health wait

This commit is contained in:
Nathan Flurry 2026-03-05 20:16:15 -08:00
parent 7d4914c4a2
commit 18473e56e4
3 changed files with 196 additions and 12 deletions

View file

@ -39,7 +39,7 @@ const sdk = await SandboxAgent.connect({
});
```
`SandboxAgent.connect(...)` now waits for `/v1/health` by default before other SDK requests proceed. To disable that gate, pass `waitForHealth: false`. To keep the default gate but fail after a bounded wait, pass `waitForHealth: { timeoutMs: 120_000 }`.
`SandboxAgent.connect(...)` now waits for `/v1/health` by default before other SDK requests proceed. To disable that gate, pass `waitForHealth: false`. To keep the default gate but fail after a bounded wait, pass `waitForHealth: { timeoutMs: 120_000 }`. To cancel the startup wait early, pass `signal: abortController.signal`.
With a custom fetch handler (for example, proxying requests inside Workers):
@ -49,6 +49,19 @@ const sdk = await SandboxAgent.connect({
});
```
With an abort signal for the startup health gate:
```ts
const controller = new AbortController();
const sdk = await SandboxAgent.connect({
baseUrl: "http://127.0.0.1:2468",
signal: controller.signal,
});
controller.abort();
```
With persistence:
```ts
@ -173,6 +186,7 @@ Parameters:
- `headers` (optional): Additional request headers
- `fetch` (optional): Custom fetch implementation used by SDK HTTP and ACP calls
- `waitForHealth` (optional, defaults to enabled): waits for `/v1/health` before HTTP helpers and ACP session setup proceed; pass `false` to disable or `{ timeoutMs }` to bound the wait
- `signal` (optional): aborts the startup `/v1/health` wait used by `connect()`
## Types

View file

@ -67,6 +67,7 @@ interface SandboxAgentConnectCommonOptions {
persist?: SessionPersistDriver;
replayMaxEvents?: number;
replayMaxChars?: number;
signal?: AbortSignal;
token?: string;
waitForHealth?: boolean | SandboxAgentHealthWaitOptions;
}
@ -452,6 +453,7 @@ export class SandboxAgent {
private readonly fetcher: typeof fetch;
private readonly defaultHeaders?: HeadersInit;
private readonly healthWait: NormalizedHealthWaitOptions;
private readonly healthWaitAbortController = new AbortController();
private readonly persist: SessionPersistDriver;
private readonly replayMaxEvents: number;
@ -482,7 +484,7 @@ export class SandboxAgent {
}
this.fetcher = resolvedFetch;
this.defaultHeaders = options.headers;
this.healthWait = normalizeHealthWaitOptions(options.waitForHealth);
this.healthWait = normalizeHealthWaitOptions(options.waitForHealth, options.signal);
this.persist = options.persist ?? new InMemorySessionPersistDriver();
this.replayMaxEvents = normalizePositiveInt(options.replayMaxEvents, DEFAULT_REPLAY_MAX_EVENTS);
@ -522,6 +524,7 @@ export class SandboxAgent {
async dispose(): Promise<void> {
this.disposed = true;
this.healthWaitAbortController.abort(createAbortError("SandboxAgent was disposed."));
const connections = [...this.liveConnections.values()];
this.liveConnections.clear();
@ -985,7 +988,7 @@ export class SandboxAgent {
private async requestRaw(method: string, path: string, options: RequestOptions = {}): Promise<Response> {
if (!options.skipReadyWait) {
await this.awaitHealthy();
await this.awaitHealthy(options.signal);
}
const url = this.buildUrl(path, options.query);
@ -1034,18 +1037,23 @@ export class SandboxAgent {
});
}
private async awaitHealthy(): Promise<void> {
private async awaitHealthy(signal?: AbortSignal): Promise<void> {
if (!this.healthPromise) {
throwIfAborted(signal);
return;
}
await this.healthPromise;
await waitForAbortable(this.healthPromise, signal);
throwIfAborted(signal);
if (this.healthError) {
throw this.healthError;
}
}
private async runHealthWait(): Promise<void> {
const signal = this.healthWait.enabled
? anyAbortSignal([this.healthWait.signal, this.healthWaitAbortController.signal])
: undefined;
const startedAt = Date.now();
const deadline =
typeof this.healthWait.timeoutMs === "number" ? startedAt + this.healthWait.timeoutMs : undefined;
@ -1055,13 +1063,21 @@ export class SandboxAgent {
let lastError: unknown;
while (!this.disposed && (deadline === undefined || Date.now() < deadline)) {
throwIfAborted(signal);
try {
const health = await this.getHealth();
const health = await this.requestJson<HealthResponse>("GET", `${API_PREFIX}/health`, {
signal,
skipReadyWait: true,
});
if (health.status === "ok") {
return;
}
lastError = new Error(`Unexpected health response: ${JSON.stringify(health)}`);
} catch (error) {
if (isAbortError(error)) {
throw error;
}
lastError = error;
}
@ -1074,7 +1090,7 @@ export class SandboxAgent {
nextLogAt = now + HEALTH_WAIT_LOG_EVERY_MS;
}
await sleep(delayMs);
await sleep(delayMs, signal);
delayMs = Math.min(HEALTH_WAIT_MAX_DELAY_MS, delayMs * 2);
}
@ -1132,8 +1148,8 @@ type RequestOptions = {
};
type NormalizedHealthWaitOptions =
| { enabled: false; timeoutMs?: undefined }
| { enabled: true; timeoutMs?: number };
| { enabled: false; timeoutMs?: undefined; signal?: undefined }
| { enabled: true; timeoutMs?: number; signal?: AbortSignal };
/**
* Auto-select and call `authenticate` based on the agent's advertised auth methods.
@ -1297,13 +1313,14 @@ function normalizePositiveInt(value: number | undefined, fallback: number): numb
function normalizeHealthWaitOptions(
value: boolean | SandboxAgentHealthWaitOptions | undefined,
signal: AbortSignal | undefined,
): NormalizedHealthWaitOptions {
if (value === false) {
return { enabled: false };
}
if (value === true || value === undefined) {
return { enabled: true };
return { enabled: true, signal };
}
const timeoutMs =
@ -1313,6 +1330,7 @@ function normalizeHealthWaitOptions(
return {
enabled: true,
signal,
timeoutMs,
};
}
@ -1359,6 +1377,120 @@ function formatHealthWaitError(error: unknown): string {
return String(error);
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
function anyAbortSignal(signals: Array<AbortSignal | undefined>): AbortSignal | undefined {
const active = signals.filter((signal): signal is AbortSignal => Boolean(signal));
if (active.length === 0) {
return undefined;
}
if (active.length === 1) {
return active[0];
}
const controller = new AbortController();
const onAbort = (event: Event) => {
cleanup();
const signal = event.target as AbortSignal;
controller.abort(signal.reason ?? createAbortError());
};
const cleanup = () => {
for (const signal of active) {
signal.removeEventListener("abort", onAbort);
}
};
for (const signal of active) {
if (signal.aborted) {
controller.abort(signal.reason ?? createAbortError());
return controller.signal;
}
}
for (const signal of active) {
signal.addEventListener("abort", onAbort, { once: true });
}
return controller.signal;
}
function throwIfAborted(signal: AbortSignal | undefined): void {
if (!signal?.aborted) {
return;
}
throw signal.reason instanceof Error ? signal.reason : createAbortError(signal.reason);
}
async function waitForAbortable<T>(promise: Promise<T>, signal: AbortSignal | undefined): Promise<T> {
if (!signal) {
return promise;
}
throwIfAborted(signal);
return new Promise<T>((resolve, reject) => {
const onAbort = () => {
cleanup();
reject(signal.reason instanceof Error ? signal.reason : createAbortError(signal.reason));
};
const cleanup = () => {
signal.removeEventListener("abort", onAbort);
};
signal.addEventListener("abort", onAbort, { once: true });
promise.then(
(value) => {
cleanup();
resolve(value);
},
(error) => {
cleanup();
reject(error);
},
);
});
}
function isAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}
function createAbortError(reason?: unknown): Error {
if (reason instanceof Error) {
return reason;
}
const message = typeof reason === "string" ? reason : "This operation was aborted.";
if (typeof DOMException !== "undefined") {
return new DOMException(message, "AbortError");
}
const error = new Error(message);
error.name = "AbortError";
return error;
}
function sleep(ms: number, signal?: AbortSignal): Promise<void> {
if (!signal) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
throwIfAborted(signal);
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
cleanup();
resolve();
}, ms);
const onAbort = () => {
cleanup();
reject(signal.reason instanceof Error ? signal.reason : createAbortError(signal.reason));
};
const cleanup = () => {
clearTimeout(timer);
signal.removeEventListener("abort", onAbort);
};
signal.addEventListener("abort", onAbort, { once: true });
});
}

View file

@ -243,6 +243,44 @@ describe("Integration: TypeScript SDK flat session API", () => {
await sdk.dispose();
});
it("aborts the shared health wait when connect signal is aborted", async () => {
const controller = new AbortController();
const customFetch: typeof fetch = async (input, init) => {
const outgoing = new Request(input, init);
const parsed = new URL(outgoing.url);
if (parsed.pathname !== "/v1/health") {
throw new Error(`Unexpected request path during abort test: ${parsed.pathname}`);
}
return new Promise<Response>((_resolve, reject) => {
const onAbort = () => {
outgoing.signal.removeEventListener("abort", onAbort);
reject(outgoing.signal.reason ?? new DOMException("Connect aborted", "AbortError"));
};
if (outgoing.signal.aborted) {
onAbort();
return;
}
outgoing.signal.addEventListener("abort", onAbort, { once: true });
});
};
const sdk = await SandboxAgent.connect({
token,
fetch: customFetch,
signal: controller.signal,
});
const pending = sdk.listAgents();
controller.abort(new DOMException("Connect aborted", "AbortError"));
await expect(pending).rejects.toThrow("Connect aborted");
await sdk.dispose();
});
it("restores a session on stale connection by recreating and replaying history on first prompt", async () => {
const persist = new InMemorySessionPersistDriver({
maxEventsPerSession: 200,