Fix runtime memory review issues

Address runtime memory review feedback around sqlite compatibility, shutdown ordering, and endpoint validation.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Harivansh Rathi 2026-03-08 16:20:56 -07:00
parent 9c5dde8615
commit 9e11f49d17
3 changed files with 171 additions and 52 deletions

View file

@ -348,6 +348,7 @@ export class AgentSession {
private _baseSystemPrompt = "";
private _memoryManager: RuntimeMemoryManager;
private _memoryWriteQueue: Promise<void> = Promise.resolve();
private _memoryDisposePromise: Promise<void> | null = null;
constructor(config: AgentSessionConfig) {
this.agent = config.agent;
@ -540,7 +541,7 @@ export class AgentSession {
await this._checkCompaction(msg);
if (msg.stopReason !== "error") {
this._enqueueMemoryPromotion(event.messages);
this._enqueueMemoryPromotion([...this.agent.state.messages]);
}
}
}
@ -696,9 +697,7 @@ export class AgentSession {
dispose(): void {
this._disconnectFromAgent();
this._eventListeners = [];
void this._memoryWriteQueue.finally(() => {
this._memoryManager.dispose();
});
void this._disposeMemoryManager();
}
// =========================================================================
@ -889,6 +888,31 @@ export class AgentSession {
}
}
private async _disposeMemoryManager(): Promise<void> {
if (this._memoryDisposePromise) {
await this._memoryDisposePromise;
return;
}
this._memoryDisposePromise = (async () => {
try {
await this._agentEventQueue;
} catch {
// Event processing failures should not block shutdown.
}
try {
await this._memoryWriteQueue;
} catch {
// Memory writes are best-effort during shutdown too.
}
this._memoryManager.dispose();
})();
await this._memoryDisposePromise;
}
private _enqueueMemoryPromotion(messages: AgentMessage[]): void {
this._memoryWriteQueue = this._memoryWriteQueue
.catch(() => undefined)

View file

@ -698,15 +698,23 @@ export class GatewayRuntime {
if (method === "POST" && path === "/memory/forget") {
const body = await this.readJsonBody(request);
const id =
typeof body.id === "number" && Number.isFinite(body.id)
? Math.floor(body.id)
: undefined;
const key = typeof body.key === "string" ? body.key : undefined;
if (id === undefined && !key) {
this.writeJson(response, 400, {
error: "Memory forget requires an id or key",
});
return;
}
const sessionKey =
typeof body.sessionKey === "string" ? body.sessionKey : undefined;
const memorySession = await this.resolveMemorySession(sessionKey);
const result = await memorySession.forgetMemory({
id:
typeof body.id === "number" && Number.isFinite(body.id)
? Math.floor(body.id)
: undefined,
key: typeof body.key === "string" ? body.key : undefined,
id,
key,
});
this.writeJson(response, 200, result);
return;

View file

@ -6,7 +6,7 @@ import {
readFileSync,
statSync,
} from "node:fs";
import { DatabaseSync } from "node:sqlite";
import { createRequire } from "node:module";
import { homedir } from "node:os";
import { basename, join, resolve } from "node:path";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
@ -26,6 +26,7 @@ const DEFAULT_CORE_TOKEN_BUDGET = 700;
const DEFAULT_RECALL_RESULTS = 4;
const DEFAULT_WRITER_MAX_TOKENS = 600;
const CUSTOM_MEMORY_TYPE = "companion_memory";
const require = createRequire(import.meta.url);
const MEMORY_WRITER_SYSTEM_PROMPT = `You manage long-term conversational memory for a companion agent.
@ -164,6 +165,50 @@ interface LegacyMemoryFile {
body: string;
}
interface SqliteStatementResult {
changes: number;
lastInsertRowid: number | bigint;
}
interface SqliteStatement {
run(...args: unknown[]): SqliteStatementResult;
get(...args: unknown[]): unknown;
all(...args: unknown[]): unknown[];
}
interface SqliteDatabase {
exec(sql: string): void;
prepare(sql: string): SqliteStatement;
close(): void;
}
type SqliteDatabaseConstructor = new (path: string) => SqliteDatabase;
let cachedSqliteDatabaseConstructor:
| SqliteDatabaseConstructor
| null
| undefined;
function loadSqliteDatabaseConstructor(): SqliteDatabaseConstructor | null {
if (cachedSqliteDatabaseConstructor !== undefined) {
return cachedSqliteDatabaseConstructor;
}
try {
const sqliteModule = require("node:sqlite") as {
DatabaseSync?: SqliteDatabaseConstructor;
};
cachedSqliteDatabaseConstructor =
typeof sqliteModule.DatabaseSync === "function"
? sqliteModule.DatabaseSync
: null;
} catch {
cachedSqliteDatabaseConstructor = null;
}
return cachedSqliteDatabaseConstructor;
}
function asRecord(value: unknown): Record<string, unknown> | null {
if (typeof value !== "object" || value === null || Array.isArray(value)) {
return null;
@ -530,7 +575,7 @@ export class RuntimeMemoryManager {
};
private readonly identity: RuntimeMemoryIdentity | null;
private readonly dbPath: string | null;
private readonly database: DatabaseSync | null;
private readonly database: SqliteDatabase | null;
constructor(params: {
sessionManager: ReadonlySessionManager;
@ -547,11 +592,18 @@ export class RuntimeMemoryManager {
return;
}
mkdirSync(this.settings.storageDir, { recursive: true });
this.dbPath = join(
this.settings.storageDir,
buildDbFileName(this.identity),
);
const DatabaseSync = loadSqliteDatabaseConstructor();
if (!DatabaseSync) {
this.database = null;
return;
}
mkdirSync(this.settings.storageDir, { recursive: true });
this.database = new DatabaseSync(this.dbPath);
this.database.exec("PRAGMA journal_mode = WAL;");
this.database.exec("PRAGMA busy_timeout = 5000;");
@ -610,7 +662,7 @@ export class RuntimeMemoryManager {
}
getStatus(): RuntimeMemoryStatus {
if (!this.database || !this.identity) {
if (!this.identity) {
return {
enabled: this.settings.enabled,
ready: false,
@ -625,6 +677,21 @@ export class RuntimeMemoryManager {
};
}
if (!this.database) {
return {
enabled: this.settings.enabled,
ready: false,
identity: this.identity,
storagePath: this.dbPath,
coreCount: 0,
archivalCount: 0,
episodeCount: 0,
lastMemoryWriteAt: null,
lastEpisodeAt: null,
legacyImportComplete: false,
};
}
const counts = this.database
.prepare(
`SELECT
@ -811,47 +878,69 @@ export class RuntimeMemoryManager {
.run(now, now, existing.id);
return this.getMemoryById(existing.id);
}
this.database
.prepare(
`UPDATE memories
SET active = 0, superseded_at = ?
WHERE id = ?`,
)
.run(now, existing.id);
}
let newId = 0;
this.database.exec("BEGIN IMMEDIATE;");
try {
if (existing) {
this.database
.prepare(
`UPDATE memories
SET active = 0, superseded_at = ?
WHERE id = ?`,
)
.run(now, existing.id);
}
const insertResult = this.database
.prepare(
`INSERT INTO memories (
bucket,
kind,
memory_key,
content,
search_text,
source,
active,
created_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?)`,
)
.run(
bucket,
kind,
memoryKey,
content,
createSearchText({
const insertResult = this.database
.prepare(
`INSERT INTO memories (
bucket,
kind,
memory_key,
content,
search_text,
source,
active,
created_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?)`,
)
.run(
bucket,
kind,
key: memoryKey,
memoryKey,
content,
}),
input.source ?? "manual",
now,
now,
);
createSearchText({
bucket,
kind,
key: memoryKey,
content,
}),
input.source ?? "manual",
now,
now,
);
return this.getMemoryById(Number(insertResult.lastInsertRowid));
newId = Number(insertResult.lastInsertRowid);
if (existing) {
this.database
.prepare(
`UPDATE memories
SET superseded_by_id = ?
WHERE id = ?`,
)
.run(newId, existing.id);
}
this.database.exec("COMMIT;");
} catch (error) {
this.database.exec("ROLLBACK;");
throw error;
}
return this.getMemoryById(newId);
}
forget(input: RuntimeMemoryForgetInput): { ok: true; forgotten: boolean } {
@ -928,8 +1017,6 @@ export class RuntimeMemoryManager {
.run(createEpisodeSearchText(row.role, row.text), row.id);
}
this.database.exec("VACUUM;");
return {
ok: true,
memoryRows: memoryRows.length,
@ -1022,9 +1109,9 @@ export class RuntimeMemoryManager {
};
return [
...messages.slice(0, lastUserIndex + 1),
...messages.slice(0, lastUserIndex),
injectedMessage,
...messages.slice(lastUserIndex + 1),
...messages.slice(lastUserIndex),
];
}