fix(acp): avoid deadlock on permission requests during long POST

This commit is contained in:
Nathan Flurry 2026-02-12 00:31:41 -08:00
parent 7f9460bbbb
commit 1d000581a0
2 changed files with 131 additions and 16 deletions

View file

@ -376,32 +376,61 @@ class StreamableHttpAcpTransport {
});
const url = this.buildUrl(this.bootstrapQueryIfNeeded());
const response = await this.fetcher(url, {
const responsePromise = this.fetcher(url, {
method: "POST",
headers,
body: JSON.stringify(message),
});
this.postedOnce = true;
if (!response.ok) {
throw new AcpHttpError(response.status, await readProblem(response), response);
}
this.ensureSseLoop();
if (response.status === 200) {
const text = await response.text();
if (text.trim()) {
const envelope = JSON.parse(text) as AnyMessage;
this.pushInbound(envelope);
const consumeResponse = async (): Promise<void> => {
const response = await responsePromise;
if (!response.ok) {
throw new AcpHttpError(response.status, await readProblem(response), response);
}
} else {
// Drain response body so the underlying connection is released back to
// the pool. Without this, Node.js undici keeps the socket occupied and
// may stall subsequent requests to the same origin.
await response.text().catch(() => {});
if (response.status === 200) {
const text = await response.text();
if (text.trim()) {
const envelope = JSON.parse(text) as AnyMessage;
this.pushInbound(envelope);
}
} else {
// Drain response body so the underlying connection is released back to
// the pool. Without this, Node.js undici keeps the socket occupied and
// may stall subsequent requests to the same origin.
await response.text().catch(() => {});
}
};
// Don't block subsequent writes (e.g. permission replies) behind long
// running prompt turns; prompt completions arrive via SSE.
if (isRequestMessage(message)) {
consumeResponse().catch((error) => {
this.handleDetachedRequestError(message, error);
});
return;
}
await consumeResponse();
}
private handleDetachedRequestError(message: AnyMessage, error: unknown): void {
const id = requestIdFromMessage(message);
if (id === undefined) {
this.failReadable(error);
return;
}
const rpcError = toRpcError(error);
this.pushInbound({
jsonrpc: "2.0",
id,
error: rpcError,
} as AnyMessage);
}
private ensureSseLoop(): void {
@ -681,5 +710,51 @@ function buildQueryParams(source: Record<string, QueryValue>): URLSearchParams {
return params;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}
function isRequestMessage(message: AnyMessage): boolean {
if (!isRecord(message)) {
return false;
}
const record = message as Record<string, unknown>;
const method = record["method"];
const id = record["id"];
return typeof method === "string" && id !== undefined;
}
function requestIdFromMessage(message: AnyMessage): number | string | null | undefined {
if (!isRecord(message) || !Object.prototype.hasOwnProperty.call(message, "id")) {
return undefined;
}
const record = message as Record<string, unknown>;
const id = record["id"];
if (typeof id === "string" || typeof id === "number" || id === null) {
return id;
}
return undefined;
}
function toRpcError(error: unknown): RpcErrorResponse {
if (error instanceof AcpHttpError) {
return {
code: -32003,
message: error.problem?.title ?? `HTTP ${error.status}`,
data: error.problem ?? { status: error.status },
};
}
if (error instanceof Error) {
return {
code: -32603,
message: error.message,
};
}
return {
code: -32603,
message: String(error),
};
}
export type * from "@agentclientprotocol/sdk";
export { PROTOCOL_VERSION } from "@agentclientprotocol/sdk";

View file

@ -7,8 +7,11 @@ import {
type CancelNotification,
type NewSessionRequest,
type NewSessionResponse,
type PermissionOption,
type PromptRequest,
type PromptResponse,
type RequestPermissionRequest,
type RequestPermissionResponse,
type SessionNotification,
type SetSessionConfigOptionRequest,
type SetSessionModeRequest,
@ -227,6 +230,9 @@ export class LiveAcpConnection {
bootstrapQuery: { agent: options.agent },
},
client: {
requestPermission: async (request: RequestPermissionRequest): Promise<RequestPermissionResponse> => {
return autoSelectPermissionResponse(request);
},
sessionUpdate: async (_notification: SessionNotification) => {
// Session updates are observed via envelope persistence.
},
@ -1011,6 +1017,40 @@ function normalizeSessionInit(
};
}
function autoSelectPermissionResponse(
request: RequestPermissionRequest,
): RequestPermissionResponse {
const chosen = selectPermissionOption(request.options ?? []);
if (!chosen) {
return {
outcome: {
outcome: "cancelled",
},
};
}
return {
outcome: {
outcome: "selected",
optionId: chosen.optionId,
},
};
}
function selectPermissionOption(options: PermissionOption[]): PermissionOption | null {
const allowOnce = options.find((option) => option.kind === "allow_once");
if (allowOnce) {
return allowOnce;
}
const allowAlways = options.find((option) => option.kind === "allow_always");
if (allowAlways) {
return allowAlways;
}
return null;
}
function mapSessionParams(params: Record<string, unknown>, agentSessionId: string): Record<string, unknown> {
return {
...params,