feat: download batch

This commit is contained in:
Nathan Flurry 2026-02-23 09:51:18 -08:00
parent 3545139cd3
commit e1a09564e4
14 changed files with 702 additions and 91 deletions

View file

@ -28,6 +28,7 @@ import {
type FsMoveResponse,
type FsPathQuery,
type FsStat,
type FsDownloadBatchQuery,
type FsUploadBatchQuery,
type FsUploadBatchResponse,
type FsWriteResponse,
@ -53,6 +54,101 @@ const DEFAULT_REPLAY_MAX_EVENTS = 50;
const DEFAULT_REPLAY_MAX_CHARS = 12_000;
const EVENT_INDEX_SCAN_EVENTS_LIMIT = 500;
function isNodeRuntime(): boolean {
return typeof process !== "undefined" && !!process.versions?.node;
}
type TarModule = {
create: (options: Record<string, unknown>, files: string[]) => Promise<unknown> | unknown;
extract: (options: Record<string, unknown>) => unknown;
};
async function importTarOrThrow(): Promise<TarModule> {
try {
return (await import("tar")) as unknown as TarModule;
} catch {
throw new Error(
"`tar` is required for this operation. Install it (e.g. `npm i tar`) or use the raw byte APIs instead.",
);
}
}
async function createTarBytesFromSourcePath(sourcePath: string): Promise<ArrayBuffer> {
if (!isNodeRuntime()) {
throw new Error("Path-based batch upload requires a Node.js runtime.");
}
const tar = await importTarOrThrow();
const fs = await import("node:fs/promises");
const os = await import("node:os");
const path = await import("node:path");
const stat = await fs.stat(sourcePath);
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "sandbox-agent-upload-"));
const tarPath = path.join(tmpDir, "upload.tar");
try {
if (stat.isDirectory()) {
// Pack directory contents (equivalent to: tar -cf upload.tar -C <dir> .)
await tar.create(
{
file: tarPath,
cwd: sourcePath,
},
["."],
);
} else if (stat.isFile()) {
// Pack a single file as ./<basename>
await tar.create(
{
file: tarPath,
cwd: path.dirname(sourcePath),
},
[path.basename(sourcePath)],
);
} else {
throw new Error(`Unsupported path type for batch upload: ${sourcePath}`);
}
const bytes = await fs.readFile(tarPath);
// Slice to avoid sharing a larger underlying buffer.
return bytes.buffer.slice(bytes.byteOffset, bytes.byteOffset + bytes.byteLength);
} finally {
await fs.rm(tmpDir, { recursive: true, force: true });
}
}
async function writeBytesToPath(outPath: string, bytes: Uint8Array): Promise<void> {
if (!isNodeRuntime()) {
throw new Error("Path-based batch download requires a Node.js runtime.");
}
const fs = await import("node:fs/promises");
const path = await import("node:path");
await fs.mkdir(path.dirname(outPath), { recursive: true });
await fs.writeFile(outPath, bytes);
}
async function extractTarBytesToDir(destDir: string, tarBytes: Uint8Array): Promise<void> {
if (!isNodeRuntime()) {
throw new Error("Extracting batch downloads requires a Node.js runtime.");
}
const tar = await importTarOrThrow();
const fs = await import("node:fs/promises");
const stream = await import("node:stream");
const streamPromises = await import("node:stream/promises");
const buffer = await import("node:buffer");
await fs.mkdir(destDir, { recursive: true });
const readable = new stream.PassThrough();
readable.end(buffer.Buffer.from(tarBytes));
await streamPromises.pipeline(
readable as any,
tar.extract({
cwd: destDir,
}) as any,
);
}
export interface SandboxAgentConnectOptions {
baseUrl: string;
token?: string;
@ -61,6 +157,13 @@ export interface SandboxAgentConnectOptions {
persist?: SessionPersistDriver;
replayMaxEvents?: number;
replayMaxChars?: number;
/**
* Disable the background SSE GET loop for ACP connections. When true,
* all responses are read from POST response bodies. Useful for environments
* where streaming GET requests are not supported (e.g. Cloudflare Workers
* `containerFetch`).
*/
disableSse?: boolean;
}
export interface SandboxAgentStartOptions extends Omit<SandboxAgentConnectOptions, "baseUrl" | "token"> {
@ -207,6 +310,7 @@ export class LiveAcpConnection {
headers?: HeadersInit;
agent: string;
serverId: string;
disableSse?: boolean;
onObservedEnvelope: (
connection: LiveAcpConnection,
envelope: AnyMessage,
@ -225,6 +329,7 @@ export class LiveAcpConnection {
transport: {
path: `${API_PREFIX}/acp/${encodeURIComponent(options.serverId)}`,
bootstrapQuery: { agent: options.agent },
disableSse: options.disableSse,
},
client: {
sessionUpdate: async (_notification: SessionNotification) => {
@ -409,6 +514,7 @@ export class SandboxAgent {
private readonly persist: SessionPersistDriver;
private readonly replayMaxEvents: number;
private readonly replayMaxChars: number;
private readonly disableSse: boolean;
private spawnHandle?: SandboxAgentSpawnHandle;
@ -427,6 +533,7 @@ export class SandboxAgent {
this.replayMaxEvents = normalizePositiveInt(options.replayMaxEvents, DEFAULT_REPLAY_MAX_EVENTS);
this.replayMaxChars = normalizePositiveInt(options.replayMaxChars, DEFAULT_REPLAY_MAX_CHARS);
this.disableSse = options.disableSse ?? false;
if (!this.fetcher) {
throw new Error("Fetch API is not available; provide a fetch implementation.");
@ -454,6 +561,7 @@ export class SandboxAgent {
persist: options.persist,
replayMaxEvents: options.replayMaxEvents,
replayMaxChars: options.replayMaxChars,
disableSse: options.disableSse,
});
client.spawnHandle = handle;
@ -685,16 +793,44 @@ export class SandboxAgent {
return this.requestJson("GET", `${FS_PATH}/stat`, { query });
}
async uploadFsBatch(body: BodyInit, query?: FsUploadBatchQuery): Promise<FsUploadBatchResponse> {
async uploadFsBatch(
body: BodyInit | { sourcePath: string },
query?: FsUploadBatchQuery,
): Promise<FsUploadBatchResponse> {
const resolvedBody =
typeof body === "object" && body !== null && "sourcePath" in body
? await createTarBytesFromSourcePath((body as { sourcePath: string }).sourcePath)
: body;
const response = await this.requestRaw("POST", `${FS_PATH}/upload-batch`, {
query,
rawBody: body,
rawBody: resolvedBody,
contentType: "application/x-tar",
accept: "application/json",
});
return (await response.json()) as FsUploadBatchResponse;
}
async downloadFsBatch(
query: FsDownloadBatchQuery = {},
options?: { outPath?: string; extractTo?: string },
): Promise<Uint8Array> {
const response = await this.requestRaw("GET", `${FS_PATH}/download-batch`, {
query,
accept: "application/x-tar",
});
const buffer = await response.arrayBuffer();
const bytes = new Uint8Array(buffer);
if (options?.outPath) {
await writeBytesToPath(options.outPath, bytes);
}
if (options?.extractTo) {
await extractTarBytesToDir(options.extractTo, bytes);
}
return bytes;
}
async getMcpConfig(query: McpConfigQuery): Promise<McpServerConfig> {
return this.requestJson("GET", `${API_PREFIX}/config/mcp`, { query });
}
@ -733,6 +869,7 @@ export class SandboxAgent {
headers: this.defaultHeaders,
agent,
serverId,
disableSse: this.disableSse,
onObservedEnvelope: (connection, envelope, direction, localSessionId) => {
void this.persistObservedEnvelope(connection, envelope, direction, localSessionId);
},

View file

@ -32,6 +32,14 @@ export interface paths {
put: operations["put_v1_config_skills"];
delete: operations["delete_v1_config_skills"];
};
"/v1/fs/download-batch": {
/**
* Download a tar archive of a file or directory.
* @description Returns `application/x-tar` bytes containing the requested path. If the path is a directory,
* the archive contains its contents (similar to `tar -C <dir> .`).
*/
get: operations["get_v1_fs_download_batch"];
};
"/v1/fs/entries": {
get: operations["get_v1_fs_entries"];
};
@ -141,6 +149,9 @@ export interface components {
path: string;
recursive?: boolean | null;
};
FsDownloadBatchQuery: {
path?: string | null;
};
FsEntriesQuery: {
path?: string | null;
};
@ -599,6 +610,25 @@ export interface operations {
};
};
};
/**
* Download a tar archive of a file or directory.
* @description Returns `application/x-tar` bytes containing the requested path. If the path is a directory,
* the archive contains its contents (similar to `tar -C <dir> .`).
*/
get_v1_fs_download_batch: {
parameters: {
query?: {
/** @description Source path (file or directory) */
path?: string | null;
};
};
responses: {
/** @description tar archive bytes */
200: {
content: never;
};
};
};
get_v1_fs_entries: {
parameters: {
query?: {

View file

@ -38,6 +38,7 @@ export type {
FsEntry,
FsMoveRequest,
FsMoveResponse,
FsDownloadBatchQuery,
FsPathQuery,
FsStat,
FsUploadBatchQuery,

View file

@ -18,6 +18,7 @@ export type FsEntry = components["schemas"]["FsEntry"];
export type FsPathQuery = QueryParams<operations["get_v1_fs_file"]>;
export type FsDeleteQuery = QueryParams<operations["delete_v1_fs_entry"]>;
export type FsUploadBatchQuery = QueryParams<operations["post_v1_fs_upload_batch"]>;
export type FsDownloadBatchQuery = QueryParams<operations["get_v1_fs_download_batch"]>;
export type FsWriteResponse = JsonResponse<operations["put_v1_fs_file"], 200>;
export type FsActionResponse = JsonResponse<operations["delete_v1_fs_entry"], 200>;
export type FsMoveRequest = JsonRequestBody<operations["post_v1_fs_move"]>;