Add SDK health wait gate (#206)

* Add SDK health wait gate

* Default connect to waiting for health

* Document connect health wait default

* Add abort signal to connect health wait

* Refactor SDK health probe helper

* Update quickstart health wait note

* Remove example health polling

* Fix docker example codex startup
This commit is contained in:
Nathan Flurry 2026-03-06 00:05:06 -08:00 committed by GitHub
parent 4335ef6af6
commit e7343e14bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 423 additions and 95 deletions

View file

@ -67,13 +67,23 @@ const DEFAULT_BASE_URL = "http://sandbox-agent";
const DEFAULT_REPLAY_MAX_EVENTS = 50;
const DEFAULT_REPLAY_MAX_CHARS = 12_000;
const EVENT_INDEX_SCAN_EVENTS_LIMIT = 500;
const HEALTH_WAIT_MIN_DELAY_MS = 500;
const HEALTH_WAIT_MAX_DELAY_MS = 15_000;
const HEALTH_WAIT_LOG_AFTER_MS = 5_000;
const HEALTH_WAIT_LOG_EVERY_MS = 10_000;
export interface SandboxAgentHealthWaitOptions {
timeoutMs?: number;
}
interface SandboxAgentConnectCommonOptions {
headers?: HeadersInit;
persist?: SessionPersistDriver;
replayMaxEvents?: number;
replayMaxChars?: number;
signal?: AbortSignal;
token?: string;
waitForHealth?: boolean | SandboxAgentHealthWaitOptions;
}
export type SandboxAgentConnectOptions =
@ -477,12 +487,17 @@ export class SandboxAgent {
private readonly token?: string;
private readonly fetcher: typeof fetch;
private readonly defaultHeaders?: HeadersInit;
private readonly healthWait: NormalizedHealthWaitOptions;
private readonly healthWaitAbortController = new AbortController();
private readonly persist: SessionPersistDriver;
private readonly replayMaxEvents: number;
private readonly replayMaxChars: number;
private spawnHandle?: SandboxAgentSpawnHandle;
private healthPromise?: Promise<void>;
private healthError?: Error;
private disposed = false;
private readonly liveConnections = new Map<string, LiveAcpConnection>();
private readonly pendingLiveConnections = new Map<string, Promise<LiveAcpConnection>>();
@ -504,10 +519,13 @@ export class SandboxAgent {
}
this.fetcher = resolvedFetch;
this.defaultHeaders = options.headers;
this.healthWait = normalizeHealthWaitOptions(options.waitForHealth, options.signal);
this.persist = options.persist ?? new InMemorySessionPersistDriver();
this.replayMaxEvents = normalizePositiveInt(options.replayMaxEvents, DEFAULT_REPLAY_MAX_EVENTS);
this.replayMaxChars = normalizePositiveInt(options.replayMaxChars, DEFAULT_REPLAY_MAX_CHARS);
this.startHealthWait();
}
static async connect(options: SandboxAgentConnectOptions): Promise<SandboxAgent> {
@ -529,6 +547,7 @@ export class SandboxAgent {
token: handle.token,
fetch: options.fetch,
headers: options.headers,
waitForHealth: false,
persist: options.persist,
replayMaxEvents: options.replayMaxEvents,
replayMaxChars: options.replayMaxChars,
@ -539,6 +558,9 @@ export class SandboxAgent {
}
async dispose(): Promise<void> {
this.disposed = true;
this.healthWaitAbortController.abort(createAbortError("SandboxAgent was disposed."));
const connections = [...this.liveConnections.values()];
this.liveConnections.clear();
const pending = [...this.pendingLiveConnections.values()];
@ -706,7 +728,7 @@ export class SandboxAgent {
}
async getHealth(): Promise<HealthResponse> {
return this.requestJson("GET", `${API_PREFIX}/health`);
return this.requestHealth();
}
async listAgents(options?: AgentQueryOptions): Promise<AgentListResponse> {
@ -935,6 +957,8 @@ export class SandboxAgent {
}
private async getLiveConnection(agent: string): Promise<LiveAcpConnection> {
await this.awaitHealthy();
const existing = this.liveConnections.get(agent);
if (existing) {
return existing;
@ -1115,6 +1139,7 @@ export class SandboxAgent {
headers: options.headers,
accept: options.accept ?? "application/json",
signal: options.signal,
skipReadyWait: options.skipReadyWait,
});
if (response.status === 204) {
@ -1125,6 +1150,10 @@ export class SandboxAgent {
}
private async requestRaw(method: string, path: string, options: RequestOptions = {}): Promise<Response> {
if (!options.skipReadyWait) {
await this.awaitHealthy(options.signal);
}
const url = this.buildUrl(path, options.query);
const headers = this.buildHeaders(options.headers);
@ -1161,6 +1190,79 @@ export class SandboxAgent {
return response;
}
private startHealthWait(): void {
if (!this.healthWait.enabled || this.healthPromise) {
return;
}
this.healthPromise = this.runHealthWait().catch((error) => {
this.healthError = error instanceof Error ? error : new Error(String(error));
});
}
private async awaitHealthy(signal?: AbortSignal): Promise<void> {
if (!this.healthPromise) {
throwIfAborted(signal);
return;
}
await waitForAbortable(this.healthPromise, signal);
throwIfAborted(signal);
if (this.healthError) {
throw this.healthError;
}
}
private async runHealthWait(): Promise<void> {
const signal = this.healthWait.enabled
? anyAbortSignal([this.healthWait.signal, this.healthWaitAbortController.signal])
: undefined;
const startedAt = Date.now();
const deadline =
typeof this.healthWait.timeoutMs === "number" ? startedAt + this.healthWait.timeoutMs : undefined;
let delayMs = HEALTH_WAIT_MIN_DELAY_MS;
let nextLogAt = startedAt + HEALTH_WAIT_LOG_AFTER_MS;
let lastError: unknown;
while (!this.disposed && (deadline === undefined || Date.now() < deadline)) {
throwIfAborted(signal);
try {
const health = await this.requestHealth({ signal });
if (health.status === "ok") {
return;
}
lastError = new Error(`Unexpected health response: ${JSON.stringify(health)}`);
} catch (error) {
if (isAbortError(error)) {
throw error;
}
lastError = error;
}
const now = Date.now();
if (now >= nextLogAt) {
const details = formatHealthWaitError(lastError);
console.warn(
`sandbox-agent at ${this.baseUrl} is not healthy after ${now - startedAt}ms; still waiting (${details})`,
);
nextLogAt = now + HEALTH_WAIT_LOG_EVERY_MS;
}
await sleep(delayMs, signal);
delayMs = Math.min(HEALTH_WAIT_MAX_DELAY_MS, delayMs * 2);
}
if (this.disposed) {
return;
}
throw new Error(
`Timed out waiting for sandbox-agent health after ${this.healthWait.timeoutMs}ms (${formatHealthWaitError(lastError)})`,
);
}
private buildHeaders(extra?: HeadersInit): Headers {
const headers = new Headers(this.defaultHeaders ?? undefined);
@ -1190,6 +1292,13 @@ export class SandboxAgent {
return url.toString();
}
private async requestHealth(options: { signal?: AbortSignal } = {}): Promise<HealthResponse> {
return this.requestJson("GET", `${API_PREFIX}/health`, {
signal: options.signal,
skipReadyWait: true,
});
}
}
type QueryValue = string | number | boolean | null | undefined;
@ -1202,8 +1311,13 @@ type RequestOptions = {
headers?: HeadersInit;
accept?: string;
signal?: AbortSignal;
skipReadyWait?: boolean;
};
type NormalizedHealthWaitOptions =
| { enabled: false; timeoutMs?: undefined; signal?: undefined }
| { enabled: true; timeoutMs?: number; signal?: AbortSignal };
/**
* Auto-select and call `authenticate` based on the agent's advertised auth methods.
* Prefers env-var-based methods that the server process already has configured.
@ -1375,6 +1489,30 @@ function normalizePositiveInt(value: number | undefined, fallback: number): numb
return Math.floor(value as number);
}
function normalizeHealthWaitOptions(
value: boolean | SandboxAgentHealthWaitOptions | undefined,
signal: AbortSignal | undefined,
): NormalizedHealthWaitOptions {
if (value === false) {
return { enabled: false };
}
if (value === true || value === undefined) {
return { enabled: true, signal };
}
const timeoutMs =
typeof value.timeoutMs === "number" && Number.isFinite(value.timeoutMs) && value.timeoutMs > 0
? Math.floor(value.timeoutMs)
: undefined;
return {
enabled: true,
signal,
timeoutMs,
};
}
function normalizeSpawnOptions(
spawn: SandboxAgentSpawnOptions | boolean | undefined,
defaultEnabled: boolean,
@ -1405,6 +1543,92 @@ async function readProblem(response: Response): Promise<ProblemDetails | undefin
}
}
function formatHealthWaitError(error: unknown): string {
if (error instanceof Error && error.message) {
return error.message;
}
if (error === undefined || error === null) {
return "unknown error";
}
return String(error);
}
function anyAbortSignal(signals: Array<AbortSignal | undefined>): AbortSignal | undefined {
const active = signals.filter((signal): signal is AbortSignal => Boolean(signal));
if (active.length === 0) {
return undefined;
}
if (active.length === 1) {
return active[0];
}
const controller = new AbortController();
const onAbort = (event: Event) => {
cleanup();
const signal = event.target as AbortSignal;
controller.abort(signal.reason ?? createAbortError());
};
const cleanup = () => {
for (const signal of active) {
signal.removeEventListener("abort", onAbort);
}
};
for (const signal of active) {
if (signal.aborted) {
controller.abort(signal.reason ?? createAbortError());
return controller.signal;
}
}
for (const signal of active) {
signal.addEventListener("abort", onAbort, { once: true });
}
return controller.signal;
}
function throwIfAborted(signal: AbortSignal | undefined): void {
if (!signal?.aborted) {
return;
}
throw signal.reason instanceof Error ? signal.reason : createAbortError(signal.reason);
}
async function waitForAbortable<T>(promise: Promise<T>, signal: AbortSignal | undefined): Promise<T> {
if (!signal) {
return promise;
}
throwIfAborted(signal);
return new Promise<T>((resolve, reject) => {
const onAbort = () => {
cleanup();
reject(signal.reason instanceof Error ? signal.reason : createAbortError(signal.reason));
};
const cleanup = () => {
signal.removeEventListener("abort", onAbort);
};
signal.addEventListener("abort", onAbort, { once: true });
promise.then(
(value) => {
cleanup();
resolve(value);
},
(error) => {
cleanup();
reject(error);
},
);
});
}
async function consumeProcessLogSse(
body: ReadableStream<Uint8Array>,
listener: ProcessLogListener,
@ -1494,3 +1718,43 @@ function toWebSocketUrl(url: string): string {
function isAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}
function createAbortError(reason?: unknown): Error {
if (reason instanceof Error) {
return reason;
}
const message = typeof reason === "string" ? reason : "This operation was aborted.";
if (typeof DOMException !== "undefined") {
return new DOMException(message, "AbortError");
}
const error = new Error(message);
error.name = "AbortError";
return error;
}
function sleep(ms: number, signal?: AbortSignal): Promise<void> {
if (!signal) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
throwIfAborted(signal);
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
cleanup();
resolve();
}, ms);
const onAbort = () => {
cleanup();
reject(signal.reason instanceof Error ? signal.reason : createAbortError(signal.reason));
};
const cleanup = () => {
clearTimeout(timer);
signal.removeEventListener("abort", onAbort);
};
signal.addEventListener("abort", onAbort, { once: true });
});
}

View file

@ -10,6 +10,7 @@ export { AcpRpcError } from "acp-http-client";
export { buildInspectorUrl } from "./inspector.ts";
export type {
SandboxAgentHealthWaitOptions,
AgentQueryOptions,
ProcessLogFollowQuery,
ProcessLogListener,

View file

@ -337,6 +337,111 @@ describe("Integration: TypeScript SDK flat session API", () => {
);
});
it("waits for health before non-ACP HTTP helpers", async () => {
const defaultFetch = globalThis.fetch;
if (!defaultFetch) {
throw new Error("Global fetch is not available in this runtime.");
}
let healthAttempts = 0;
const seenPaths: string[] = [];
const customFetch: typeof fetch = async (input, init) => {
const outgoing = new Request(input, init);
const parsed = new URL(outgoing.url);
seenPaths.push(parsed.pathname);
if (parsed.pathname === "/v1/health") {
healthAttempts += 1;
if (healthAttempts < 3) {
return new Response("warming up", { status: 503 });
}
}
const forwardedUrl = new URL(`${parsed.pathname}${parsed.search}`, baseUrl);
const forwarded = new Request(forwardedUrl.toString(), outgoing);
return defaultFetch(forwarded);
};
const sdk = await SandboxAgent.connect({
token,
fetch: customFetch,
});
const agents = await sdk.listAgents();
expect(Array.isArray(agents.agents)).toBe(true);
expect(healthAttempts).toBe(3);
const firstAgentsRequest = seenPaths.indexOf("/v1/agents");
expect(firstAgentsRequest).toBeGreaterThanOrEqual(0);
expect(seenPaths.slice(0, firstAgentsRequest)).toEqual([
"/v1/health",
"/v1/health",
"/v1/health",
]);
await sdk.dispose();
});
it("surfaces health timeout when a request awaits readiness", async () => {
const customFetch: typeof fetch = async (input, init) => {
const outgoing = new Request(input, init);
const parsed = new URL(outgoing.url);
if (parsed.pathname === "/v1/health") {
return new Response("warming up", { status: 503 });
}
throw new Error(`Unexpected request path during timeout test: ${parsed.pathname}`);
};
const sdk = await SandboxAgent.connect({
token,
fetch: customFetch,
waitForHealth: { timeoutMs: 100 },
});
await expect(sdk.listAgents()).rejects.toThrow("Timed out waiting for sandbox-agent health");
await sdk.dispose();
});
it("aborts the shared health wait when connect signal is aborted", async () => {
const controller = new AbortController();
const customFetch: typeof fetch = async (input, init) => {
const outgoing = new Request(input, init);
const parsed = new URL(outgoing.url);
if (parsed.pathname !== "/v1/health") {
throw new Error(`Unexpected request path during abort test: ${parsed.pathname}`);
}
return new Promise<Response>((_resolve, reject) => {
const onAbort = () => {
outgoing.signal.removeEventListener("abort", onAbort);
reject(outgoing.signal.reason ?? new DOMException("Connect aborted", "AbortError"));
};
if (outgoing.signal.aborted) {
onAbort();
return;
}
outgoing.signal.addEventListener("abort", onAbort, { once: true });
});
};
const sdk = await SandboxAgent.connect({
token,
fetch: customFetch,
signal: controller.signal,
});
const pending = sdk.listAgents();
controller.abort(new DOMException("Connect aborted", "AbortError"));
await expect(pending).rejects.toThrow("Connect aborted");
await sdk.dispose();
});
it("restores a session on stale connection by recreating and replaying history on first prompt", async () => {
const persist = new InMemorySessionPersistDriver({
maxEventsPerSession: 200,