From 1d000581a073cc0d1767321a6e0aef42f25b82c4 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 12 Feb 2026 00:31:41 -0800 Subject: [PATCH] fix(acp): avoid deadlock on permission requests during long POST --- sdks/acp-http-client/src/index.ts | 107 +++++++++++++++++++++++++----- sdks/typescript/src/client.ts | 40 +++++++++++ 2 files changed, 131 insertions(+), 16 deletions(-) diff --git a/sdks/acp-http-client/src/index.ts b/sdks/acp-http-client/src/index.ts index 976a464..a517417 100644 --- a/sdks/acp-http-client/src/index.ts +++ b/sdks/acp-http-client/src/index.ts @@ -376,32 +376,61 @@ class StreamableHttpAcpTransport { }); const url = this.buildUrl(this.bootstrapQueryIfNeeded()); - const response = await this.fetcher(url, { + const responsePromise = this.fetcher(url, { method: "POST", headers, body: JSON.stringify(message), }); this.postedOnce = true; - - if (!response.ok) { - throw new AcpHttpError(response.status, await readProblem(response), response); - } - this.ensureSseLoop(); - if (response.status === 200) { - const text = await response.text(); - if (text.trim()) { - const envelope = JSON.parse(text) as AnyMessage; - this.pushInbound(envelope); + const consumeResponse = async (): Promise => { + const response = await responsePromise; + + if (!response.ok) { + throw new AcpHttpError(response.status, await readProblem(response), response); } - } else { - // Drain response body so the underlying connection is released back to - // the pool. Without this, Node.js undici keeps the socket occupied and - // may stall subsequent requests to the same origin. - await response.text().catch(() => {}); + + if (response.status === 200) { + const text = await response.text(); + if (text.trim()) { + const envelope = JSON.parse(text) as AnyMessage; + this.pushInbound(envelope); + } + } else { + // Drain response body so the underlying connection is released back to + // the pool. Without this, Node.js undici keeps the socket occupied and + // may stall subsequent requests to the same origin. + await response.text().catch(() => {}); + } + }; + + // Don't block subsequent writes (e.g. permission replies) behind long + // running prompt turns; prompt completions arrive via SSE. + if (isRequestMessage(message)) { + consumeResponse().catch((error) => { + this.handleDetachedRequestError(message, error); + }); + return; } + + await consumeResponse(); + } + + private handleDetachedRequestError(message: AnyMessage, error: unknown): void { + const id = requestIdFromMessage(message); + if (id === undefined) { + this.failReadable(error); + return; + } + + const rpcError = toRpcError(error); + this.pushInbound({ + jsonrpc: "2.0", + id, + error: rpcError, + } as AnyMessage); } private ensureSseLoop(): void { @@ -681,5 +710,51 @@ function buildQueryParams(source: Record): URLSearchParams { return params; } +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null; +} + +function isRequestMessage(message: AnyMessage): boolean { + if (!isRecord(message)) { + return false; + } + const record = message as Record; + const method = record["method"]; + const id = record["id"]; + return typeof method === "string" && id !== undefined; +} + +function requestIdFromMessage(message: AnyMessage): number | string | null | undefined { + if (!isRecord(message) || !Object.prototype.hasOwnProperty.call(message, "id")) { + return undefined; + } + const record = message as Record; + const id = record["id"]; + if (typeof id === "string" || typeof id === "number" || id === null) { + return id; + } + return undefined; +} + +function toRpcError(error: unknown): RpcErrorResponse { + if (error instanceof AcpHttpError) { + return { + code: -32003, + message: error.problem?.title ?? `HTTP ${error.status}`, + data: error.problem ?? { status: error.status }, + }; + } + if (error instanceof Error) { + return { + code: -32603, + message: error.message, + }; + } + return { + code: -32603, + message: String(error), + }; +} + export type * from "@agentclientprotocol/sdk"; export { PROTOCOL_VERSION } from "@agentclientprotocol/sdk"; diff --git a/sdks/typescript/src/client.ts b/sdks/typescript/src/client.ts index e724e24..ce67078 100644 --- a/sdks/typescript/src/client.ts +++ b/sdks/typescript/src/client.ts @@ -7,8 +7,11 @@ import { type CancelNotification, type NewSessionRequest, type NewSessionResponse, + type PermissionOption, type PromptRequest, type PromptResponse, + type RequestPermissionRequest, + type RequestPermissionResponse, type SessionNotification, type SetSessionConfigOptionRequest, type SetSessionModeRequest, @@ -227,6 +230,9 @@ export class LiveAcpConnection { bootstrapQuery: { agent: options.agent }, }, client: { + requestPermission: async (request: RequestPermissionRequest): Promise => { + return autoSelectPermissionResponse(request); + }, sessionUpdate: async (_notification: SessionNotification) => { // Session updates are observed via envelope persistence. }, @@ -1011,6 +1017,40 @@ function normalizeSessionInit( }; } +function autoSelectPermissionResponse( + request: RequestPermissionRequest, +): RequestPermissionResponse { + const chosen = selectPermissionOption(request.options ?? []); + if (!chosen) { + return { + outcome: { + outcome: "cancelled", + }, + }; + } + + return { + outcome: { + outcome: "selected", + optionId: chosen.optionId, + }, + }; +} + +function selectPermissionOption(options: PermissionOption[]): PermissionOption | null { + const allowOnce = options.find((option) => option.kind === "allow_once"); + if (allowOnce) { + return allowOnce; + } + + const allowAlways = options.find((option) => option.kind === "allow_always"); + if (allowAlways) { + return allowAlways; + } + + return null; +} + function mapSessionParams(params: Record, agentSessionId: string): Record { return { ...params,