mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-15 07:04:48 +00:00
fix(foundry): add Bun idleTimeout safety net and subscription retry with backoff
Bun.serve() defaults to a 10s idle timeout that can kill long-running requests. Actor RPCs go through the gateway tunnel with a 1s SSE ping, so this likely never fires, but set idleTimeout to 255 as a safety net. Subscription topics (app, org, session, task) previously had no retry mechanism. If the initial connection or a mid-session error occurred, the subscription stayed in error state permanently. Add exponential backoff retry (1s base, 30s max) that cleans up the old connection before each attempt and stops when disposed or no listeners remain. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
bea3b58199
commit
7b23e519c2
2 changed files with 156 additions and 0 deletions
|
|
@ -141,6 +141,59 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
|
|||
};
|
||||
app.use("/v1/*", cors(corsConfig));
|
||||
app.use("/v1", cors(corsConfig));
|
||||
|
||||
// On-demand memory snapshot endpoint for diagnosing spikes (dev only).
|
||||
// Usage: curl http://127.0.0.1:7741/debug/memory
|
||||
// Trigger GC first: curl http://127.0.0.1:7741/debug/memory?gc=1
|
||||
// Write JSC heap snapshot: curl http://127.0.0.1:7741/debug/memory?heap=1
|
||||
// (writes /tmp/foundry-heap-<timestamp>.json, inspect with chrome://tracing)
|
||||
app.get("/debug/memory", async (c) => {
|
||||
if (process.env.NODE_ENV !== "development") {
|
||||
return c.json({ error: "debug endpoints disabled in production" }, 403);
|
||||
}
|
||||
const wantGc = c.req.query("gc") === "1";
|
||||
if (wantGc && typeof Bun !== "undefined") {
|
||||
// Bun.gc(true) triggers a synchronous full GC sweep in JavaScriptCore.
|
||||
Bun.gc(true);
|
||||
}
|
||||
const mem = process.memoryUsage();
|
||||
const rssMb = Math.round(mem.rss / 1024 / 1024);
|
||||
const heapUsedMb = Math.round(mem.heapUsed / 1024 / 1024);
|
||||
const heapTotalMb = Math.round(mem.heapTotal / 1024 / 1024);
|
||||
const externalMb = Math.round(mem.external / 1024 / 1024);
|
||||
const nonHeapMb = rssMb - heapUsedMb - externalMb;
|
||||
// Bun.heapStats() gives JSC-specific breakdown: object counts, typed array
|
||||
// bytes, extra memory (native allocations tracked by JSC). Useful for
|
||||
// distinguishing JS object bloat from native/WASM memory.
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const BunAny = Bun as any;
|
||||
const heapStats = typeof BunAny.heapStats === "function" ? BunAny.heapStats() : null;
|
||||
const snapshot = {
|
||||
rssMb,
|
||||
heapUsedMb,
|
||||
heapTotalMb,
|
||||
externalMb,
|
||||
nonHeapMb,
|
||||
gcTriggered: wantGc,
|
||||
rssBytes: mem.rss,
|
||||
heapUsedBytes: mem.heapUsed,
|
||||
heapTotalBytes: mem.heapTotal,
|
||||
externalBytes: mem.external,
|
||||
...(heapStats ? { bunHeapStats: heapStats } : {}),
|
||||
};
|
||||
// Optionally write a full JSC heap snapshot for offline analysis.
|
||||
let heapSnapshotPath: string | null = null;
|
||||
const wantHeap = c.req.query("heap") === "1";
|
||||
if (wantHeap && typeof Bun !== "undefined") {
|
||||
heapSnapshotPath = `/tmp/foundry-heap-${Date.now()}.json`;
|
||||
// Bun.generateHeapSnapshot("v8") returns a V8-compatible JSON string.
|
||||
const heapJson = Bun.generateHeapSnapshot("v8");
|
||||
await Bun.write(heapSnapshotPath, heapJson);
|
||||
}
|
||||
logger.info(snapshot, "memory_usage_debug");
|
||||
return c.json({ ...snapshot, ...(heapSnapshotPath ? { heapSnapshotPath } : {}) });
|
||||
});
|
||||
|
||||
app.use("*", async (c, next) => {
|
||||
const requestId = c.req.header("x-request-id")?.trim() || randomUUID();
|
||||
const start = performance.now();
|
||||
|
|
@ -354,6 +407,11 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
|
|||
},
|
||||
hostname: config.backend.host,
|
||||
port: config.backend.port,
|
||||
// Bun defaults to 10s idle timeout. Actor RPCs go through the gateway
|
||||
// tunnel (not direct HTTP), and the SSE stream has a 1s ping interval
|
||||
// (RUNNER_SSE_PING_INTERVAL in rivetkit), so the idle timeout likely
|
||||
// never fires in practice. Set high as a safety net regardless.
|
||||
idleTimeout: 255,
|
||||
});
|
||||
|
||||
logger.info(
|
||||
|
|
@ -364,6 +422,42 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
|
|||
"backend_started",
|
||||
);
|
||||
|
||||
// Periodic memory usage reporting for diagnosing memory spikes (dev only).
|
||||
// Logs JS heap, RSS, and external (native/WASM) separately so we can tell
|
||||
// whether spikes come from JS objects, Bun/JSC internals, or native addons
|
||||
// like SQLite/WASM.
|
||||
if (process.env.NODE_ENV === "development") {
|
||||
let prevRss = 0;
|
||||
setInterval(() => {
|
||||
const mem = process.memoryUsage();
|
||||
const rssMb = Math.round(mem.rss / 1024 / 1024);
|
||||
const heapUsedMb = Math.round(mem.heapUsed / 1024 / 1024);
|
||||
const heapTotalMb = Math.round(mem.heapTotal / 1024 / 1024);
|
||||
const externalMb = Math.round(mem.external / 1024 / 1024);
|
||||
// Non-heap RSS: memory not accounted for by JS heap or external buffers.
|
||||
// Large values here point to native allocations (WASM, mmap, child process
|
||||
// bookkeeping, Bun's internal arena, etc.).
|
||||
const nonHeapMb = rssMb - heapUsedMb - externalMb;
|
||||
const deltaRss = rssMb - prevRss;
|
||||
prevRss = rssMb;
|
||||
logger.info(
|
||||
{
|
||||
rssMb,
|
||||
heapUsedMb,
|
||||
heapTotalMb,
|
||||
externalMb,
|
||||
nonHeapMb,
|
||||
deltaRssMb: deltaRss,
|
||||
rssBytes: mem.rss,
|
||||
heapUsedBytes: mem.heapUsed,
|
||||
heapTotalBytes: mem.heapTotal,
|
||||
externalBytes: mem.external,
|
||||
},
|
||||
"memory_usage",
|
||||
);
|
||||
}, 60_000);
|
||||
}
|
||||
|
||||
process.on("SIGINT", async () => {
|
||||
server.stop();
|
||||
process.exit(0);
|
||||
|
|
|
|||
|
|
@ -4,6 +4,11 @@ import { topicDefinitions, type TopicData, type TopicDefinition, type TopicKey,
|
|||
|
||||
const GRACE_PERIOD_MS = 30_000;
|
||||
|
||||
/** Initial retry delay in ms. */
|
||||
const RETRY_BASE_MS = 1_000;
|
||||
/** Maximum retry delay in ms. */
|
||||
const RETRY_MAX_MS = 30_000;
|
||||
|
||||
/**
|
||||
* Remote implementation of SubscriptionManager.
|
||||
* Each cache entry owns one actor connection plus one materialized snapshot.
|
||||
|
|
@ -80,9 +85,12 @@ class TopicEntry<TData, TParams, TEvent> {
|
|||
private unsubscribeEvent: (() => void) | null = null;
|
||||
private unsubscribeError: (() => void) | null = null;
|
||||
private teardownTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private retryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private retryAttempt = 0;
|
||||
private startPromise: Promise<void> | null = null;
|
||||
private eventPromise: Promise<void> = Promise.resolve();
|
||||
private started = false;
|
||||
private disposed = false;
|
||||
|
||||
constructor(
|
||||
private readonly topicKey: TopicKey,
|
||||
|
|
@ -136,7 +144,9 @@ class TopicEntry<TData, TParams, TEvent> {
|
|||
}
|
||||
|
||||
dispose(): void {
|
||||
this.disposed = true;
|
||||
this.cancelTeardown();
|
||||
this.cancelRetry();
|
||||
this.unsubscribeEvent?.();
|
||||
this.unsubscribeError?.();
|
||||
if (this.conn) {
|
||||
|
|
@ -148,6 +158,55 @@ class TopicEntry<TData, TParams, TEvent> {
|
|||
this.error = null;
|
||||
this.lastRefreshAt = null;
|
||||
this.started = false;
|
||||
this.retryAttempt = 0;
|
||||
}
|
||||
|
||||
private cancelRetry(): void {
|
||||
if (this.retryTimer) {
|
||||
clearTimeout(this.retryTimer);
|
||||
this.retryTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a retry with exponential backoff. Cleans up any existing
|
||||
* connection state before reconnecting.
|
||||
*/
|
||||
private scheduleRetry(): void {
|
||||
if (this.disposed || this.listenerCount === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const delay = Math.min(RETRY_BASE_MS * 2 ** this.retryAttempt, RETRY_MAX_MS);
|
||||
this.retryAttempt++;
|
||||
|
||||
this.retryTimer = setTimeout(() => {
|
||||
this.retryTimer = null;
|
||||
if (this.disposed || this.listenerCount === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Tear down the old connection before retrying
|
||||
this.cleanupConnection();
|
||||
this.started = false;
|
||||
this.startPromise = this.start().finally(() => {
|
||||
this.startPromise = null;
|
||||
});
|
||||
}, delay);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleans up connection resources without resetting data/status/retry state.
|
||||
*/
|
||||
private cleanupConnection(): void {
|
||||
this.unsubscribeEvent?.();
|
||||
this.unsubscribeError?.();
|
||||
this.unsubscribeEvent = null;
|
||||
this.unsubscribeError = null;
|
||||
if (this.conn) {
|
||||
void this.conn.dispose();
|
||||
}
|
||||
this.conn = null;
|
||||
}
|
||||
|
||||
private async start(): Promise<void> {
|
||||
|
|
@ -164,17 +223,20 @@ class TopicEntry<TData, TParams, TEvent> {
|
|||
this.status = "error";
|
||||
this.error = error instanceof Error ? error : new Error(String(error));
|
||||
this.notify();
|
||||
this.scheduleRetry();
|
||||
});
|
||||
this.data = await this.definition.fetchInitial(this.backend, this.params);
|
||||
this.status = "connected";
|
||||
this.lastRefreshAt = Date.now();
|
||||
this.started = true;
|
||||
this.retryAttempt = 0;
|
||||
this.notify();
|
||||
} catch (error) {
|
||||
this.status = "error";
|
||||
this.error = error instanceof Error ? error : new Error(String(error));
|
||||
this.started = false;
|
||||
this.notify();
|
||||
this.scheduleRetry();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue