feat: add turn streaming and inspector updates

This commit is contained in:
Nathan Flurry 2026-01-27 06:18:43 -08:00
parent bf58891edf
commit 34d4f3693e
49 changed files with 4629 additions and 1146 deletions

View file

@ -13,6 +13,7 @@ import type {
ProblemDetails,
QuestionReplyRequest,
SessionListResponse,
TurnStreamQuery,
UniversalEvent,
} from "./types.ts";
@ -142,45 +143,37 @@ export class SandboxAgent {
});
}
async postMessageStream(
sessionId: string,
request: MessageRequest,
query?: TurnStreamQuery,
signal?: AbortSignal,
): Promise<Response> {
return this.requestRaw("POST", `${API_PREFIX}/sessions/${encodeURIComponent(sessionId)}/messages/stream`, {
query,
body: request,
accept: "text/event-stream",
signal,
});
}
async *streamEvents(
sessionId: string,
query?: EventsQuery,
signal?: AbortSignal,
): AsyncGenerator<UniversalEvent, void, void> {
const response = await this.getEventsSse(sessionId, query, signal);
if (!response.body) {
throw new Error("SSE stream is not readable in this environment.");
}
yield* this.parseSseStream(response);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// Normalize CRLF to LF for consistent parsing
buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n");
let index = buffer.indexOf("\n\n");
while (index !== -1) {
const chunk = buffer.slice(0, index);
buffer = buffer.slice(index + 2);
const dataLines = chunk
.split("\n")
.filter((line) => line.startsWith("data:"));
if (dataLines.length > 0) {
const payload = dataLines
.map((line) => line.slice(5).trim())
.join("\n");
if (payload) {
yield JSON.parse(payload) as UniversalEvent;
}
}
index = buffer.indexOf("\n\n");
}
}
async *streamTurn(
sessionId: string,
request: MessageRequest,
query?: TurnStreamQuery,
signal?: AbortSignal,
): AsyncGenerator<UniversalEvent, void, void> {
const response = await this.postMessageStream(sessionId, request, query, signal);
yield* this.parseSseStream(response);
}
async replyQuestion(
@ -297,6 +290,42 @@ export class SandboxAgent {
return undefined;
}
}
private async *parseSseStream(response: Response): AsyncGenerator<UniversalEvent, void, void> {
if (!response.body) {
throw new Error("SSE stream is not readable in this environment.");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
// Normalize CRLF to LF for consistent parsing
buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n");
let index = buffer.indexOf("\n\n");
while (index !== -1) {
const chunk = buffer.slice(0, index);
buffer = buffer.slice(index + 2);
const dataLines = chunk
.split("\n")
.filter((line) => line.startsWith("data:"));
if (dataLines.length > 0) {
const payload = dataLines
.map((line) => line.slice(5).trim())
.join("\n");
if (payload) {
yield JSON.parse(payload) as UniversalEvent;
}
}
index = buffer.indexOf("\n\n");
}
}
}
}
const normalizeSpawnOptions = (

View file

@ -32,6 +32,9 @@ export interface paths {
"/v1/sessions/{session_id}/messages": {
post: operations["post_message"];
};
"/v1/sessions/{session_id}/messages/stream": {
post: operations["post_message_stream"];
};
"/v1/sessions/{session_id}/permissions/{permission_id}/reply": {
post: operations["reply_permission"];
};
@ -258,6 +261,9 @@ export interface components {
};
/** @enum {string} */
TerminatedBy: "agent" | "daemon";
TurnStreamQuery: {
includeRaw?: boolean | null;
};
UniversalEvent: {
data: components["schemas"]["UniversalEventData"];
event_id: string;
@ -480,6 +486,34 @@ export interface operations {
};
};
};
post_message_stream: {
parameters: {
query?: {
/** @description Include raw provider payloads */
include_raw?: boolean | null;
};
path: {
/** @description Session id */
session_id: string;
};
};
requestBody: {
content: {
"application/json": components["schemas"]["MessageRequest"];
};
};
responses: {
/** @description SSE event stream */
200: {
content: never;
};
404: {
content: {
"application/json": components["schemas"]["ProblemDetails"];
};
};
};
};
reply_permission: {
parameters: {
path: {

View file

@ -41,6 +41,7 @@ export type {
SessionListResponse,
SessionStartedData,
TerminatedBy,
TurnStreamQuery,
UniversalEvent,
UniversalEventData,
UniversalEventType,

View file

@ -39,6 +39,7 @@ export type SessionInfo = S["SessionInfo"];
export type SessionListResponse = S["SessionListResponse"];
export type SessionStartedData = S["SessionStartedData"];
export type TerminatedBy = S["TerminatedBy"];
export type TurnStreamQuery = S["TurnStreamQuery"];
export type UniversalEvent = S["UniversalEvent"];
export type UniversalEventData = S["UniversalEventData"];
export type UniversalEventType = S["UniversalEventType"];