From 18473e56e4424f18d69fc5a9e259a2624acec6f2 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 5 Mar 2026 20:16:15 -0800 Subject: [PATCH] Add abort signal to connect health wait --- docs/sdk-overview.mdx | 16 ++- sdks/typescript/src/client.ts | 154 ++++++++++++++++++++-- sdks/typescript/tests/integration.test.ts | 38 ++++++ 3 files changed, 196 insertions(+), 12 deletions(-) diff --git a/docs/sdk-overview.mdx b/docs/sdk-overview.mdx index e150c4d..53a38f6 100644 --- a/docs/sdk-overview.mdx +++ b/docs/sdk-overview.mdx @@ -39,7 +39,7 @@ const sdk = await SandboxAgent.connect({ }); ``` -`SandboxAgent.connect(...)` now waits for `/v1/health` by default before other SDK requests proceed. To disable that gate, pass `waitForHealth: false`. To keep the default gate but fail after a bounded wait, pass `waitForHealth: { timeoutMs: 120_000 }`. +`SandboxAgent.connect(...)` now waits for `/v1/health` by default before other SDK requests proceed. To disable that gate, pass `waitForHealth: false`. To keep the default gate but fail after a bounded wait, pass `waitForHealth: { timeoutMs: 120_000 }`. To cancel the startup wait early, pass `signal: abortController.signal`. With a custom fetch handler (for example, proxying requests inside Workers): @@ -49,6 +49,19 @@ const sdk = await SandboxAgent.connect({ }); ``` +With an abort signal for the startup health gate: + +```ts +const controller = new AbortController(); + +const sdk = await SandboxAgent.connect({ + baseUrl: "http://127.0.0.1:2468", + signal: controller.signal, +}); + +controller.abort(); +``` + With persistence: ```ts @@ -173,6 +186,7 @@ Parameters: - `headers` (optional): Additional request headers - `fetch` (optional): Custom fetch implementation used by SDK HTTP and ACP calls - `waitForHealth` (optional, defaults to enabled): waits for `/v1/health` before HTTP helpers and ACP session setup proceed; pass `false` to disable or `{ timeoutMs }` to bound the wait +- `signal` (optional): aborts the startup `/v1/health` wait used by `connect()` ## Types diff --git a/sdks/typescript/src/client.ts b/sdks/typescript/src/client.ts index cc23e82..d20cef4 100644 --- a/sdks/typescript/src/client.ts +++ b/sdks/typescript/src/client.ts @@ -67,6 +67,7 @@ interface SandboxAgentConnectCommonOptions { persist?: SessionPersistDriver; replayMaxEvents?: number; replayMaxChars?: number; + signal?: AbortSignal; token?: string; waitForHealth?: boolean | SandboxAgentHealthWaitOptions; } @@ -452,6 +453,7 @@ export class SandboxAgent { private readonly fetcher: typeof fetch; private readonly defaultHeaders?: HeadersInit; private readonly healthWait: NormalizedHealthWaitOptions; + private readonly healthWaitAbortController = new AbortController(); private readonly persist: SessionPersistDriver; private readonly replayMaxEvents: number; @@ -482,7 +484,7 @@ export class SandboxAgent { } this.fetcher = resolvedFetch; this.defaultHeaders = options.headers; - this.healthWait = normalizeHealthWaitOptions(options.waitForHealth); + this.healthWait = normalizeHealthWaitOptions(options.waitForHealth, options.signal); this.persist = options.persist ?? new InMemorySessionPersistDriver(); this.replayMaxEvents = normalizePositiveInt(options.replayMaxEvents, DEFAULT_REPLAY_MAX_EVENTS); @@ -522,6 +524,7 @@ export class SandboxAgent { async dispose(): Promise { this.disposed = true; + this.healthWaitAbortController.abort(createAbortError("SandboxAgent was disposed.")); const connections = [...this.liveConnections.values()]; this.liveConnections.clear(); @@ -985,7 +988,7 @@ export class SandboxAgent { private async requestRaw(method: string, path: string, options: RequestOptions = {}): Promise { if (!options.skipReadyWait) { - await this.awaitHealthy(); + await this.awaitHealthy(options.signal); } const url = this.buildUrl(path, options.query); @@ -1034,18 +1037,23 @@ export class SandboxAgent { }); } - private async awaitHealthy(): Promise { + private async awaitHealthy(signal?: AbortSignal): Promise { if (!this.healthPromise) { + throwIfAborted(signal); return; } - await this.healthPromise; + await waitForAbortable(this.healthPromise, signal); + throwIfAborted(signal); if (this.healthError) { throw this.healthError; } } private async runHealthWait(): Promise { + const signal = this.healthWait.enabled + ? anyAbortSignal([this.healthWait.signal, this.healthWaitAbortController.signal]) + : undefined; const startedAt = Date.now(); const deadline = typeof this.healthWait.timeoutMs === "number" ? startedAt + this.healthWait.timeoutMs : undefined; @@ -1055,13 +1063,21 @@ export class SandboxAgent { let lastError: unknown; while (!this.disposed && (deadline === undefined || Date.now() < deadline)) { + throwIfAborted(signal); + try { - const health = await this.getHealth(); + const health = await this.requestJson("GET", `${API_PREFIX}/health`, { + signal, + skipReadyWait: true, + }); if (health.status === "ok") { return; } lastError = new Error(`Unexpected health response: ${JSON.stringify(health)}`); } catch (error) { + if (isAbortError(error)) { + throw error; + } lastError = error; } @@ -1074,7 +1090,7 @@ export class SandboxAgent { nextLogAt = now + HEALTH_WAIT_LOG_EVERY_MS; } - await sleep(delayMs); + await sleep(delayMs, signal); delayMs = Math.min(HEALTH_WAIT_MAX_DELAY_MS, delayMs * 2); } @@ -1132,8 +1148,8 @@ type RequestOptions = { }; type NormalizedHealthWaitOptions = - | { enabled: false; timeoutMs?: undefined } - | { enabled: true; timeoutMs?: number }; + | { enabled: false; timeoutMs?: undefined; signal?: undefined } + | { enabled: true; timeoutMs?: number; signal?: AbortSignal }; /** * Auto-select and call `authenticate` based on the agent's advertised auth methods. @@ -1297,13 +1313,14 @@ function normalizePositiveInt(value: number | undefined, fallback: number): numb function normalizeHealthWaitOptions( value: boolean | SandboxAgentHealthWaitOptions | undefined, + signal: AbortSignal | undefined, ): NormalizedHealthWaitOptions { if (value === false) { return { enabled: false }; } if (value === true || value === undefined) { - return { enabled: true }; + return { enabled: true, signal }; } const timeoutMs = @@ -1313,6 +1330,7 @@ function normalizeHealthWaitOptions( return { enabled: true, + signal, timeoutMs, }; } @@ -1359,6 +1377,120 @@ function formatHealthWaitError(error: unknown): string { return String(error); } -function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); +function anyAbortSignal(signals: Array): AbortSignal | undefined { + const active = signals.filter((signal): signal is AbortSignal => Boolean(signal)); + if (active.length === 0) { + return undefined; + } + + if (active.length === 1) { + return active[0]; + } + + const controller = new AbortController(); + const onAbort = (event: Event) => { + cleanup(); + const signal = event.target as AbortSignal; + controller.abort(signal.reason ?? createAbortError()); + }; + const cleanup = () => { + for (const signal of active) { + signal.removeEventListener("abort", onAbort); + } + }; + + for (const signal of active) { + if (signal.aborted) { + controller.abort(signal.reason ?? createAbortError()); + return controller.signal; + } + } + + for (const signal of active) { + signal.addEventListener("abort", onAbort, { once: true }); + } + + return controller.signal; +} + +function throwIfAborted(signal: AbortSignal | undefined): void { + if (!signal?.aborted) { + return; + } + + throw signal.reason instanceof Error ? signal.reason : createAbortError(signal.reason); +} + +async function waitForAbortable(promise: Promise, signal: AbortSignal | undefined): Promise { + if (!signal) { + return promise; + } + + throwIfAborted(signal); + + return new Promise((resolve, reject) => { + const onAbort = () => { + cleanup(); + reject(signal.reason instanceof Error ? signal.reason : createAbortError(signal.reason)); + }; + const cleanup = () => { + signal.removeEventListener("abort", onAbort); + }; + + signal.addEventListener("abort", onAbort, { once: true }); + promise.then( + (value) => { + cleanup(); + resolve(value); + }, + (error) => { + cleanup(); + reject(error); + }, + ); + }); +} + +function isAbortError(error: unknown): boolean { + return error instanceof Error && error.name === "AbortError"; +} + +function createAbortError(reason?: unknown): Error { + if (reason instanceof Error) { + return reason; + } + + const message = typeof reason === "string" ? reason : "This operation was aborted."; + if (typeof DOMException !== "undefined") { + return new DOMException(message, "AbortError"); + } + + const error = new Error(message); + error.name = "AbortError"; + return error; +} + +function sleep(ms: number, signal?: AbortSignal): Promise { + if (!signal) { + return new Promise((resolve) => setTimeout(resolve, ms)); + } + + throwIfAborted(signal); + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + cleanup(); + resolve(); + }, ms); + const onAbort = () => { + cleanup(); + reject(signal.reason instanceof Error ? signal.reason : createAbortError(signal.reason)); + }; + const cleanup = () => { + clearTimeout(timer); + signal.removeEventListener("abort", onAbort); + }; + + signal.addEventListener("abort", onAbort, { once: true }); + }); } diff --git a/sdks/typescript/tests/integration.test.ts b/sdks/typescript/tests/integration.test.ts index 806c005..fd20eef 100644 --- a/sdks/typescript/tests/integration.test.ts +++ b/sdks/typescript/tests/integration.test.ts @@ -243,6 +243,44 @@ describe("Integration: TypeScript SDK flat session API", () => { await sdk.dispose(); }); + it("aborts the shared health wait when connect signal is aborted", async () => { + const controller = new AbortController(); + const customFetch: typeof fetch = async (input, init) => { + const outgoing = new Request(input, init); + const parsed = new URL(outgoing.url); + + if (parsed.pathname !== "/v1/health") { + throw new Error(`Unexpected request path during abort test: ${parsed.pathname}`); + } + + return new Promise((_resolve, reject) => { + const onAbort = () => { + outgoing.signal.removeEventListener("abort", onAbort); + reject(outgoing.signal.reason ?? new DOMException("Connect aborted", "AbortError")); + }; + + if (outgoing.signal.aborted) { + onAbort(); + return; + } + + outgoing.signal.addEventListener("abort", onAbort, { once: true }); + }); + }; + + const sdk = await SandboxAgent.connect({ + token, + fetch: customFetch, + signal: controller.signal, + }); + + const pending = sdk.listAgents(); + controller.abort(new DOMException("Connect aborted", "AbortError")); + + await expect(pending).rejects.toThrow("Connect aborted"); + await sdk.dispose(); + }); + it("restores a session on stale connection by recreating and replaying history on first prompt", async () => { const persist = new InMemorySessionPersistDriver({ maxEventsPerSession: 200,