fix(cloudflare): fix streaming responses

This commit is contained in:
Nathan Flurry 2026-02-25 01:39:27 -08:00
parent e24b7cb140
commit a3fe0cc764
6 changed files with 266 additions and 124 deletions

View file

@ -39,12 +39,44 @@ curl http://localhost:8787
Test prompt routing through the SDK with a custom sandbox fetch handler:
```bash
curl -X POST "http://localhost:8787/sandbox/demo/prompt" \
curl -N -X POST "http://localhost:8787/sandbox/demo/prompt" \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"agent":"codex","prompt":"Reply with one short sentence."}'
```
The response includes `events`, an array of all recorded session events for that prompt.
The response is an SSE stream with events:
- `session.created`
- `session.event`
- `prompt.completed`
- `done`
### Troubleshooting: only two events
If you only see:
- outbound `session/prompt`
- inbound prompt result with `stopReason: "end_turn"`
then ACP `session/update` notifications are not flowing. In Cloudflare sandbox paths this can happen if you forward `AbortSignal` from SDK fetch init into `containerFetch(...)` for long-lived ACP SSE requests.
Use:
```ts
const sdk = await SandboxAgent.connect({
fetch: (input, init) =>
sandbox.containerFetch(
input as Request | string | URL,
{
...(init ?? {}),
// Avoid passing AbortSignal through containerFetch; it can drop ACP SSE updates.
signal: undefined,
},
PORT,
),
});
```
Without `session/update` events, assistant text/tool deltas will not appear in UI streams.
## Deploy

View file

@ -1,7 +1,8 @@
import { getSandbox, type Sandbox } from "@cloudflare/sandbox";
import { Hono } from "hono";
import { HTTPException } from "hono/http-exception";
import { runPromptTest, type PromptTestRequest } from "./prompt-test";
import { streamSSE } from "hono/streaming";
import { runPromptEndpointStream, type PromptRequest } from "./prompt-endpoint";
export { Sandbox } from "@cloudflare/sandbox";
@ -49,7 +50,15 @@ async function getReadySandbox(name: string, env: Bindings): Promise<Sandbox> {
async function proxyToSandbox(sandbox: Sandbox, request: Request, path: string): Promise<Response> {
const query = new URL(request.url).search;
return sandbox.containerFetch(new Request(`http://localhost${path}${query}`, request), PORT);
return sandbox.containerFetch(
`http://localhost${path}${query}`,
{
method: request.method,
headers: request.headers,
body: request.body,
},
PORT,
);
}
const app = new Hono<AppEnv>();
@ -63,15 +72,34 @@ app.post("/sandbox/:name/prompt", async (c) => {
throw new HTTPException(400, { message: "Content-Type must be application/json" });
}
let payload: PromptTestRequest;
let payload: PromptRequest;
try {
payload = await c.req.json<PromptTestRequest>();
payload = await c.req.json<PromptRequest>();
} catch {
throw new HTTPException(400, { message: "Invalid JSON body" });
}
const sandbox = await getReadySandbox(c.req.param("name"), c.env);
return c.json(await runPromptTest(sandbox, payload, PORT));
return streamSSE(c, async (stream) => {
try {
await runPromptEndpointStream(sandbox, payload, PORT, async (event) => {
await stream.writeSSE({
event: event.type,
data: JSON.stringify(event),
});
});
await stream.writeSSE({
event: "done",
data: JSON.stringify({ ok: true }),
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await stream.writeSSE({
event: "error",
data: JSON.stringify({ message }),
});
}
});
});
app.all("/sandbox/:name/proxy/*", async (c) => {

View file

@ -0,0 +1,66 @@
import type { Sandbox } from "@cloudflare/sandbox";
import { SandboxAgent } from "sandbox-agent";
export type PromptRequest = {
agent?: string;
prompt?: string;
};
export async function runPromptEndpointStream(
sandbox: Sandbox,
request: PromptRequest,
port: number,
emit: (event: { type: string; [key: string]: unknown }) => Promise<void> | void,
): Promise<void> {
const client = await SandboxAgent.connect({
fetch: (req, init) =>
sandbox.containerFetch(
req,
{
...(init ?? {}),
// Cloudflare containerFetch may drop long-lived update streams when
// a forwarded AbortSignal is cancelled; clear it for this path.
signal: undefined,
},
port,
),
});
let unsubscribe: (() => void) | undefined;
try {
const session = await client.createSession({
agent: request.agent ?? "codex",
});
const promptText =
request.prompt?.trim() || "Reply with a short confirmation.";
await emit({
type: "session.created",
sessionId: session.id,
agent: session.agent,
prompt: promptText,
});
let pendingWrites: Promise<void> = Promise.resolve();
unsubscribe = session.onEvent((event) => {
pendingWrites = pendingWrites
.then(async () => {
await emit({ type: "session.event", event });
})
.catch(() => {});
});
const response = await session.prompt([{ type: "text", text: promptText }]);
await pendingWrites;
await emit({ type: "prompt.response", response });
await emit({ type: "prompt.completed" });
} finally {
if (unsubscribe) {
unsubscribe();
}
await Promise.race([
client.dispose(),
new Promise((resolve) => setTimeout(resolve, 250)),
]);
}
}

View file

@ -1,66 +0,0 @@
import type { Sandbox } from "@cloudflare/sandbox";
import { SandboxAgent } from "sandbox-agent";
export type PromptTestRequest = {
agent?: string;
prompt?: string;
};
export type PromptTestResponse = {
sessionId: string;
agent: string;
prompt: string;
events: unknown[];
};
export async function runPromptTest(
sandbox: Sandbox,
request: PromptTestRequest,
port: number,
): Promise<PromptTestResponse> {
const client = await SandboxAgent.connect({
fetch: (req, init) =>
sandbox.containerFetch(req, init, port),
});
let sessionId: string | null = null;
try {
const session = await client.createSession({
agent: request.agent ?? "codex",
});
sessionId = session.id;
const promptText =
request.prompt?.trim() || "Reply with a short confirmation.";
await session.prompt([{ type: "text", text: promptText }]);
const events: unknown[] = [];
let cursor: string | undefined;
while (true) {
const page = await client.getEvents({
sessionId: session.id,
cursor,
limit: 200,
});
events.push(...page.items);
if (!page.nextCursor) break;
cursor = page.nextCursor;
}
return {
sessionId: session.id,
agent: session.agent,
prompt: promptText,
events,
};
} finally {
if (sessionId) {
try {
await client.destroySession(sessionId);
} catch {
// Ignore cleanup failures; session teardown is best-effort.
}
}
await client.dispose();
}
}