From 9e11f49d175b69abf670a302903175e984298907 Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Sun, 8 Mar 2026 16:20:56 -0700 Subject: [PATCH] Fix runtime memory review issues Address runtime memory review feedback around sqlite compatibility, shutdown ordering, and endpoint validation. Co-authored-by: Codex --- .../coding-agent/src/core/agent-session.ts | 32 +++- .../coding-agent/src/core/gateway/runtime.ts | 18 +- .../src/core/memory/runtime-memory.ts | 173 +++++++++++++----- 3 files changed, 171 insertions(+), 52 deletions(-) diff --git a/packages/coding-agent/src/core/agent-session.ts b/packages/coding-agent/src/core/agent-session.ts index d08633d..48c58bf 100644 --- a/packages/coding-agent/src/core/agent-session.ts +++ b/packages/coding-agent/src/core/agent-session.ts @@ -348,6 +348,7 @@ export class AgentSession { private _baseSystemPrompt = ""; private _memoryManager: RuntimeMemoryManager; private _memoryWriteQueue: Promise = Promise.resolve(); + private _memoryDisposePromise: Promise | null = null; constructor(config: AgentSessionConfig) { this.agent = config.agent; @@ -540,7 +541,7 @@ export class AgentSession { await this._checkCompaction(msg); if (msg.stopReason !== "error") { - this._enqueueMemoryPromotion(event.messages); + this._enqueueMemoryPromotion([...this.agent.state.messages]); } } } @@ -696,9 +697,7 @@ export class AgentSession { dispose(): void { this._disconnectFromAgent(); this._eventListeners = []; - void this._memoryWriteQueue.finally(() => { - this._memoryManager.dispose(); - }); + void this._disposeMemoryManager(); } // ========================================================================= @@ -889,6 +888,31 @@ export class AgentSession { } } + private async _disposeMemoryManager(): Promise { + if (this._memoryDisposePromise) { + await this._memoryDisposePromise; + return; + } + + this._memoryDisposePromise = (async () => { + try { + await this._agentEventQueue; + } catch { + // Event processing failures should not block shutdown. + } + + try { + await this._memoryWriteQueue; + } catch { + // Memory writes are best-effort during shutdown too. + } + + this._memoryManager.dispose(); + })(); + + await this._memoryDisposePromise; + } + private _enqueueMemoryPromotion(messages: AgentMessage[]): void { this._memoryWriteQueue = this._memoryWriteQueue .catch(() => undefined) diff --git a/packages/coding-agent/src/core/gateway/runtime.ts b/packages/coding-agent/src/core/gateway/runtime.ts index 2a7d007..8623047 100644 --- a/packages/coding-agent/src/core/gateway/runtime.ts +++ b/packages/coding-agent/src/core/gateway/runtime.ts @@ -698,15 +698,23 @@ export class GatewayRuntime { if (method === "POST" && path === "/memory/forget") { const body = await this.readJsonBody(request); + const id = + typeof body.id === "number" && Number.isFinite(body.id) + ? Math.floor(body.id) + : undefined; + const key = typeof body.key === "string" ? body.key : undefined; + if (id === undefined && !key) { + this.writeJson(response, 400, { + error: "Memory forget requires an id or key", + }); + return; + } const sessionKey = typeof body.sessionKey === "string" ? body.sessionKey : undefined; const memorySession = await this.resolveMemorySession(sessionKey); const result = await memorySession.forgetMemory({ - id: - typeof body.id === "number" && Number.isFinite(body.id) - ? Math.floor(body.id) - : undefined, - key: typeof body.key === "string" ? body.key : undefined, + id, + key, }); this.writeJson(response, 200, result); return; diff --git a/packages/coding-agent/src/core/memory/runtime-memory.ts b/packages/coding-agent/src/core/memory/runtime-memory.ts index ed8454a..f249f4f 100644 --- a/packages/coding-agent/src/core/memory/runtime-memory.ts +++ b/packages/coding-agent/src/core/memory/runtime-memory.ts @@ -6,7 +6,7 @@ import { readFileSync, statSync, } from "node:fs"; -import { DatabaseSync } from "node:sqlite"; +import { createRequire } from "node:module"; import { homedir } from "node:os"; import { basename, join, resolve } from "node:path"; import type { AgentMessage } from "@mariozechner/pi-agent-core"; @@ -26,6 +26,7 @@ const DEFAULT_CORE_TOKEN_BUDGET = 700; const DEFAULT_RECALL_RESULTS = 4; const DEFAULT_WRITER_MAX_TOKENS = 600; const CUSTOM_MEMORY_TYPE = "companion_memory"; +const require = createRequire(import.meta.url); const MEMORY_WRITER_SYSTEM_PROMPT = `You manage long-term conversational memory for a companion agent. @@ -164,6 +165,50 @@ interface LegacyMemoryFile { body: string; } +interface SqliteStatementResult { + changes: number; + lastInsertRowid: number | bigint; +} + +interface SqliteStatement { + run(...args: unknown[]): SqliteStatementResult; + get(...args: unknown[]): unknown; + all(...args: unknown[]): unknown[]; +} + +interface SqliteDatabase { + exec(sql: string): void; + prepare(sql: string): SqliteStatement; + close(): void; +} + +type SqliteDatabaseConstructor = new (path: string) => SqliteDatabase; + +let cachedSqliteDatabaseConstructor: + | SqliteDatabaseConstructor + | null + | undefined; + +function loadSqliteDatabaseConstructor(): SqliteDatabaseConstructor | null { + if (cachedSqliteDatabaseConstructor !== undefined) { + return cachedSqliteDatabaseConstructor; + } + + try { + const sqliteModule = require("node:sqlite") as { + DatabaseSync?: SqliteDatabaseConstructor; + }; + cachedSqliteDatabaseConstructor = + typeof sqliteModule.DatabaseSync === "function" + ? sqliteModule.DatabaseSync + : null; + } catch { + cachedSqliteDatabaseConstructor = null; + } + + return cachedSqliteDatabaseConstructor; +} + function asRecord(value: unknown): Record | null { if (typeof value !== "object" || value === null || Array.isArray(value)) { return null; @@ -530,7 +575,7 @@ export class RuntimeMemoryManager { }; private readonly identity: RuntimeMemoryIdentity | null; private readonly dbPath: string | null; - private readonly database: DatabaseSync | null; + private readonly database: SqliteDatabase | null; constructor(params: { sessionManager: ReadonlySessionManager; @@ -547,11 +592,18 @@ export class RuntimeMemoryManager { return; } - mkdirSync(this.settings.storageDir, { recursive: true }); this.dbPath = join( this.settings.storageDir, buildDbFileName(this.identity), ); + + const DatabaseSync = loadSqliteDatabaseConstructor(); + if (!DatabaseSync) { + this.database = null; + return; + } + + mkdirSync(this.settings.storageDir, { recursive: true }); this.database = new DatabaseSync(this.dbPath); this.database.exec("PRAGMA journal_mode = WAL;"); this.database.exec("PRAGMA busy_timeout = 5000;"); @@ -610,7 +662,7 @@ export class RuntimeMemoryManager { } getStatus(): RuntimeMemoryStatus { - if (!this.database || !this.identity) { + if (!this.identity) { return { enabled: this.settings.enabled, ready: false, @@ -625,6 +677,21 @@ export class RuntimeMemoryManager { }; } + if (!this.database) { + return { + enabled: this.settings.enabled, + ready: false, + identity: this.identity, + storagePath: this.dbPath, + coreCount: 0, + archivalCount: 0, + episodeCount: 0, + lastMemoryWriteAt: null, + lastEpisodeAt: null, + legacyImportComplete: false, + }; + } + const counts = this.database .prepare( `SELECT @@ -811,47 +878,69 @@ export class RuntimeMemoryManager { .run(now, now, existing.id); return this.getMemoryById(existing.id); } - - this.database - .prepare( - `UPDATE memories - SET active = 0, superseded_at = ? - WHERE id = ?`, - ) - .run(now, existing.id); } + let newId = 0; + this.database.exec("BEGIN IMMEDIATE;"); + try { + if (existing) { + this.database + .prepare( + `UPDATE memories + SET active = 0, superseded_at = ? + WHERE id = ?`, + ) + .run(now, existing.id); + } - const insertResult = this.database - .prepare( - `INSERT INTO memories ( - bucket, - kind, - memory_key, - content, - search_text, - source, - active, - created_at, - updated_at - ) VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?)`, - ) - .run( - bucket, - kind, - memoryKey, - content, - createSearchText({ + const insertResult = this.database + .prepare( + `INSERT INTO memories ( + bucket, + kind, + memory_key, + content, + search_text, + source, + active, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?)`, + ) + .run( bucket, kind, - key: memoryKey, + memoryKey, content, - }), - input.source ?? "manual", - now, - now, - ); + createSearchText({ + bucket, + kind, + key: memoryKey, + content, + }), + input.source ?? "manual", + now, + now, + ); - return this.getMemoryById(Number(insertResult.lastInsertRowid)); + newId = Number(insertResult.lastInsertRowid); + + if (existing) { + this.database + .prepare( + `UPDATE memories + SET superseded_by_id = ? + WHERE id = ?`, + ) + .run(newId, existing.id); + } + + this.database.exec("COMMIT;"); + } catch (error) { + this.database.exec("ROLLBACK;"); + throw error; + } + + return this.getMemoryById(newId); } forget(input: RuntimeMemoryForgetInput): { ok: true; forgotten: boolean } { @@ -928,8 +1017,6 @@ export class RuntimeMemoryManager { .run(createEpisodeSearchText(row.role, row.text), row.id); } - this.database.exec("VACUUM;"); - return { ok: true, memoryRows: memoryRows.length, @@ -1022,9 +1109,9 @@ export class RuntimeMemoryManager { }; return [ - ...messages.slice(0, lastUserIndex + 1), + ...messages.slice(0, lastUserIndex), injectedMessage, - ...messages.slice(lastUserIndex + 1), + ...messages.slice(lastUserIndex), ]; }