mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-15 19:05:11 +00:00
Refactor session manager: migration chain, validation, tests
- Add migrateV1ToV2/migrateToCurrentVersion for extensible migrations - createSummaryMessage now takes timestamp from entry - loadEntriesFromFile validates session header - findMostRecentSession only returns valid session files (reads first 512 bytes) - Remove ConversationEntry alias - Fix mom context.ts TreeNode type Tests: - migration.test.ts: v1 migration, idempotency - build-context.test.ts: 14 tests covering trivial, compaction, branches - file-operations.test.ts: loadEntriesFromFile, findMostRecentSession
This commit is contained in:
parent
95312e00bb
commit
beb70f126d
7 changed files with 606 additions and 102 deletions
|
|
@ -1,15 +1,22 @@
|
|||
import type { AppMessage } from "@mariozechner/pi-agent-core";
|
||||
import { randomUUID } from "crypto";
|
||||
import { appendFileSync, existsSync, mkdirSync, readdirSync, readFileSync, statSync, writeFileSync } from "fs";
|
||||
import {
|
||||
appendFileSync,
|
||||
closeSync,
|
||||
existsSync,
|
||||
mkdirSync,
|
||||
openSync,
|
||||
readdirSync,
|
||||
readFileSync,
|
||||
readSync,
|
||||
statSync,
|
||||
writeFileSync,
|
||||
} from "fs";
|
||||
import { join, resolve } from "path";
|
||||
import { getAgentDir as getDefaultAgentDir } from "../config.js";
|
||||
|
||||
export const CURRENT_SESSION_VERSION = 2;
|
||||
|
||||
// ============================================================================
|
||||
// Session Header (metadata, not part of conversation tree)
|
||||
// ============================================================================
|
||||
|
||||
export interface SessionHeader {
|
||||
type: "session";
|
||||
version?: number; // v1 sessions don't have this
|
||||
|
|
@ -19,20 +26,6 @@ export interface SessionHeader {
|
|||
branchedFrom?: string;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Tree Node (added by SessionManager to all conversation entries)
|
||||
// ============================================================================
|
||||
|
||||
export interface TreeNode {
|
||||
id: string;
|
||||
parentId: string | null;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Content Types (what distinguishes entries - used for input)
|
||||
// ============================================================================
|
||||
|
||||
export interface MessageContent {
|
||||
type: "message";
|
||||
message: AppMessage;
|
||||
|
|
@ -61,17 +54,20 @@ export interface BranchSummaryContent {
|
|||
summary: string;
|
||||
}
|
||||
|
||||
/** Union of all content types (for input) */
|
||||
export type ConversationContent =
|
||||
/** Union of all content types (for "write" methods in SessionManager) */
|
||||
export type SessionContent =
|
||||
| MessageContent
|
||||
| ThinkingLevelContent
|
||||
| ModelChangeContent
|
||||
| CompactionContent
|
||||
| BranchSummaryContent;
|
||||
|
||||
// ============================================================================
|
||||
// Full Entry Types (TreeNode + Content - returned from SessionManager)
|
||||
// ============================================================================
|
||||
export interface TreeNode {
|
||||
type: string;
|
||||
id: string;
|
||||
parentId: string | null;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
export type SessionMessageEntry = TreeNode & MessageContent;
|
||||
export type ThinkingLevelChangeEntry = TreeNode & ThinkingLevelContent;
|
||||
|
|
@ -79,7 +75,7 @@ export type ModelChangeEntry = TreeNode & ModelChangeContent;
|
|||
export type CompactionEntry = TreeNode & CompactionContent;
|
||||
export type BranchSummaryEntry = TreeNode & BranchSummaryContent;
|
||||
|
||||
/** Session entry - has id/parentId for tree structure */
|
||||
/** Session entry - has id/parentId for tree structure (returned by "read" methods in SessionManager) */
|
||||
export type SessionEntry =
|
||||
| SessionMessageEntry
|
||||
| ThinkingLevelChangeEntry
|
||||
|
|
@ -87,9 +83,6 @@ export type SessionEntry =
|
|||
| CompactionEntry
|
||||
| BranchSummaryEntry;
|
||||
|
||||
/** @deprecated Use SessionEntry */
|
||||
export type ConversationEntry = SessionEntry;
|
||||
|
||||
/** Raw file entry (includes header) */
|
||||
export type FileEntry = SessionHeader | SessionEntry;
|
||||
|
||||
|
|
@ -118,46 +111,46 @@ export const SUMMARY_SUFFIX = `
|
|||
</summary>`;
|
||||
|
||||
/** Exported for compaction.test.ts */
|
||||
export function createSummaryMessage(summary: string): AppMessage {
|
||||
export function createSummaryMessage(summary: string, timestamp: string): AppMessage {
|
||||
return {
|
||||
role: "user",
|
||||
content: SUMMARY_PREFIX + summary + SUMMARY_SUFFIX,
|
||||
timestamp: Date.now(),
|
||||
timestamp: new Date(timestamp).getTime(),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Migrate v1 entries to v2 format by adding id/parentId fields.
|
||||
* Mutates entries in place. Safe to call on already-migrated entries.
|
||||
*/
|
||||
export function migrateSessionEntries(entries: FileEntry[]): void {
|
||||
// Check if already migrated
|
||||
const firstConv = entries.find((e) => e.type !== "session");
|
||||
if (firstConv && "id" in firstConv && firstConv.id) {
|
||||
return; // Already migrated
|
||||
/** Generate a unique short ID (8 hex chars, collision-checked) */
|
||||
function generateId(byId: { has(id: string): boolean }): string {
|
||||
for (let i = 0; i < 100; i++) {
|
||||
const id = randomUUID().slice(0, 8);
|
||||
if (!byId.has(id)) return id;
|
||||
}
|
||||
// Fallback to full UUID if somehow we have collisions
|
||||
return randomUUID();
|
||||
}
|
||||
|
||||
/** Migrate v1 → v2: add id/parentId tree structure. Mutates in place. */
|
||||
function migrateV1ToV2(entries: FileEntry[]): void {
|
||||
const ids = new Set<string>();
|
||||
let prevId: string | null = null;
|
||||
|
||||
for (const entry of entries) {
|
||||
if (entry.type === "session") {
|
||||
entry.version = CURRENT_SESSION_VERSION;
|
||||
entry.version = 2;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Add id/parentId to conversation entries
|
||||
const convEntry = entry as ConversationEntry;
|
||||
convEntry.id = randomUUID();
|
||||
convEntry.parentId = prevId;
|
||||
prevId = convEntry.id;
|
||||
entry.id = generateId(ids);
|
||||
entry.parentId = prevId;
|
||||
prevId = entry.id;
|
||||
|
||||
// Convert firstKeptEntryIndex to firstKeptEntryId for compaction
|
||||
if (entry.type === "compaction") {
|
||||
const comp = entry as CompactionEntry & { firstKeptEntryIndex?: number };
|
||||
if (typeof comp.firstKeptEntryIndex === "number") {
|
||||
// Find the entry at that index and get its id
|
||||
const targetEntry = entries[comp.firstKeptEntryIndex];
|
||||
if (targetEntry && targetEntry.type !== "session") {
|
||||
comp.firstKeptEntryId = (targetEntry as ConversationEntry).id;
|
||||
comp.firstKeptEntryId = targetEntry.id;
|
||||
}
|
||||
delete comp.firstKeptEntryIndex;
|
||||
}
|
||||
|
|
@ -165,15 +158,39 @@ export function migrateSessionEntries(entries: FileEntry[]): void {
|
|||
}
|
||||
}
|
||||
|
||||
// Add future migrations here:
|
||||
// function migrateV2ToV3(entries: FileEntry[]): void { ... }
|
||||
|
||||
/**
|
||||
* Run all necessary migrations to bring entries to current version.
|
||||
* Mutates entries in place. Returns true if any migration was applied.
|
||||
*/
|
||||
function migrateToCurrentVersion(entries: FileEntry[]): boolean {
|
||||
const header = entries.find((e) => e.type === "session") as SessionHeader | undefined;
|
||||
const version = header?.version ?? 1;
|
||||
|
||||
if (version >= CURRENT_SESSION_VERSION) return false;
|
||||
|
||||
if (version < 2) migrateV1ToV2(entries);
|
||||
// if (version < 3) migrateV2ToV3(entries);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Exported for testing */
|
||||
export function migrateSessionEntries(entries: FileEntry[]): void {
|
||||
migrateToCurrentVersion(entries);
|
||||
}
|
||||
|
||||
/** Exported for compaction.test.ts */
|
||||
export function parseSessionEntries(content: string): FileEntry[] {
|
||||
const entries: SessionEntry[] = [];
|
||||
const entries: FileEntry[] = [];
|
||||
const lines = content.trim().split("\n");
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
try {
|
||||
const entry = JSON.parse(line) as SessionEntry;
|
||||
const entry = JSON.parse(line) as FileEntry;
|
||||
entries.push(entry);
|
||||
} catch {
|
||||
// Skip malformed lines
|
||||
|
|
@ -197,18 +214,26 @@ export function getLatestCompactionEntry(entries: SessionEntry[]): CompactionEnt
|
|||
* If leafId is provided, walks from that entry to root.
|
||||
* Handles compaction and branch summaries along the path.
|
||||
*/
|
||||
export function buildSessionContext(entries: SessionEntry[], leafId?: string): SessionContext {
|
||||
// Build uuid index
|
||||
const byId = new Map<string, SessionEntry>();
|
||||
for (const entry of entries) {
|
||||
byId.set(entry.id, entry);
|
||||
export function buildSessionContext(
|
||||
entries: SessionEntry[],
|
||||
leafId?: string,
|
||||
byId?: Map<string, SessionEntry>,
|
||||
): SessionContext {
|
||||
// Build uuid index if not available
|
||||
if (!byId) {
|
||||
byId = new Map<string, SessionEntry>();
|
||||
for (const entry of entries) {
|
||||
byId.set(entry.id, entry);
|
||||
}
|
||||
}
|
||||
|
||||
// Find leaf
|
||||
let leaf: SessionEntry | undefined;
|
||||
if (leafId) {
|
||||
leaf = byId.get(leafId);
|
||||
} else {
|
||||
}
|
||||
if (!leaf) {
|
||||
// Fallback to last entry
|
||||
leaf = entries[entries.length - 1];
|
||||
}
|
||||
|
||||
|
|
@ -250,7 +275,7 @@ export function buildSessionContext(entries: SessionEntry[], leafId?: string): S
|
|||
|
||||
if (compaction) {
|
||||
// Emit summary first
|
||||
messages.push(createSummaryMessage(compaction.summary));
|
||||
messages.push(createSummaryMessage(compaction.summary, compaction.timestamp));
|
||||
|
||||
// Find compaction index in path
|
||||
const compactionIdx = path.findIndex((e) => e.type === "compaction" && e.id === compaction.id);
|
||||
|
|
@ -273,7 +298,7 @@ export function buildSessionContext(entries: SessionEntry[], leafId?: string): S
|
|||
if (entry.type === "message") {
|
||||
messages.push(entry.message);
|
||||
} else if (entry.type === "branch_summary") {
|
||||
messages.push(createSummaryMessage(entry.summary));
|
||||
messages.push(createSummaryMessage(entry.summary, entry.timestamp));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
@ -282,7 +307,7 @@ export function buildSessionContext(entries: SessionEntry[], leafId?: string): S
|
|||
if (entry.type === "message") {
|
||||
messages.push(entry.message);
|
||||
} else if (entry.type === "branch_summary") {
|
||||
messages.push(createSummaryMessage(entry.summary));
|
||||
messages.push(createSummaryMessage(entry.summary, entry.timestamp));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -303,34 +328,57 @@ function getDefaultSessionDir(cwd: string): string {
|
|||
return sessionDir;
|
||||
}
|
||||
|
||||
function loadEntriesFromFile(filePath: string): FileEntry[] {
|
||||
/** Exported for testing */
|
||||
export function loadEntriesFromFile(filePath: string): FileEntry[] {
|
||||
if (!existsSync(filePath)) return [];
|
||||
|
||||
const content = readFileSync(filePath, "utf8");
|
||||
const entries: SessionEntry[] = [];
|
||||
const entries: FileEntry[] = [];
|
||||
const lines = content.trim().split("\n");
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
try {
|
||||
const entry = JSON.parse(line) as SessionEntry;
|
||||
const entry = JSON.parse(line) as FileEntry;
|
||||
entries.push(entry);
|
||||
} catch {
|
||||
// Skip malformed lines
|
||||
}
|
||||
}
|
||||
|
||||
// Validate session header
|
||||
if (entries.length === 0) return entries;
|
||||
const header = entries[0];
|
||||
if (header.type !== "session" || typeof (header as any).id !== "string") {
|
||||
return [];
|
||||
}
|
||||
|
||||
return entries;
|
||||
}
|
||||
|
||||
function findMostRecentSession(sessionDir: string): string | null {
|
||||
function isValidSessionFile(filePath: string): boolean {
|
||||
try {
|
||||
const fd = openSync(filePath, "r");
|
||||
const buffer = Buffer.alloc(512);
|
||||
const bytesRead = readSync(fd, buffer, 0, 512, 0);
|
||||
closeSync(fd);
|
||||
const firstLine = buffer.toString("utf8", 0, bytesRead).split("\n")[0];
|
||||
if (!firstLine) return false;
|
||||
const header = JSON.parse(firstLine);
|
||||
return header.type === "session" && typeof header.id === "string";
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/** Exported for testing */
|
||||
export function findMostRecentSession(sessionDir: string): string | null {
|
||||
try {
|
||||
const files = readdirSync(sessionDir)
|
||||
.filter((f) => f.endsWith(".jsonl"))
|
||||
.map((f) => ({
|
||||
path: join(sessionDir, f),
|
||||
mtime: statSync(join(sessionDir, f)).mtime,
|
||||
}))
|
||||
.map((f) => join(sessionDir, f))
|
||||
.filter(isValidSessionFile)
|
||||
.map((path) => ({ path, mtime: statSync(path).mtime }))
|
||||
.sort((a, b) => b.mtime.getTime() - a.mtime.getTime());
|
||||
|
||||
return files[0]?.path || null;
|
||||
|
|
@ -347,21 +395,9 @@ export class SessionManager {
|
|||
private persist: boolean;
|
||||
private flushed: boolean = false;
|
||||
private inMemoryEntries: FileEntry[] = [];
|
||||
|
||||
// Tree structure (v2)
|
||||
private byId: Map<string, SessionEntry> = new Map();
|
||||
private leafId: string = "";
|
||||
|
||||
/** Generate a unique short ID (8 hex chars, collision-checked) */
|
||||
private _generateId(): string {
|
||||
for (let i = 0; i < 100; i++) {
|
||||
const id = randomUUID().slice(0, 8);
|
||||
if (!this.byId.has(id)) return id;
|
||||
}
|
||||
// Fallback to full UUID if somehow we have collisions
|
||||
return randomUUID();
|
||||
}
|
||||
|
||||
private constructor(cwd: string, sessionDir: string, sessionFile: string | null, persist: boolean) {
|
||||
this.cwd = cwd;
|
||||
this.sessionDir = sessionDir;
|
||||
|
|
@ -385,10 +421,7 @@ export class SessionManager {
|
|||
const header = this.inMemoryEntries.find((e) => e.type === "session") as SessionHeader | undefined;
|
||||
this.sessionId = header?.id ?? randomUUID();
|
||||
|
||||
// Migrate v1 to v2 if needed
|
||||
const version = header?.version ?? 1;
|
||||
if (version < CURRENT_SESSION_VERSION) {
|
||||
this._migrateToV2();
|
||||
if (migrateToCurrentVersion(this.inMemoryEntries)) {
|
||||
this._rewriteFile();
|
||||
}
|
||||
|
||||
|
|
@ -420,10 +453,6 @@ export class SessionManager {
|
|||
}
|
||||
}
|
||||
|
||||
private _migrateToV2(): void {
|
||||
migrateSessionEntries(this.inMemoryEntries);
|
||||
}
|
||||
|
||||
private _buildIndex(): void {
|
||||
this.byId.clear();
|
||||
this.leafId = "";
|
||||
|
|
@ -480,7 +509,7 @@ export class SessionManager {
|
|||
}
|
||||
}
|
||||
|
||||
private _appendEntry(entry: ConversationEntry): void {
|
||||
private _appendEntry(entry: SessionEntry): void {
|
||||
this.inMemoryEntries.push(entry);
|
||||
this.byId.set(entry.id, entry);
|
||||
this.leafId = entry.id;
|
||||
|
|
@ -490,7 +519,7 @@ export class SessionManager {
|
|||
saveMessage(message: AppMessage): string {
|
||||
const entry: SessionMessageEntry = {
|
||||
type: "message",
|
||||
id: this._generateId(),
|
||||
id: generateId(this.byId),
|
||||
parentId: this.leafId || null,
|
||||
timestamp: new Date().toISOString(),
|
||||
message,
|
||||
|
|
@ -502,7 +531,7 @@ export class SessionManager {
|
|||
saveThinkingLevelChange(thinkingLevel: string): string {
|
||||
const entry: ThinkingLevelChangeEntry = {
|
||||
type: "thinking_level_change",
|
||||
id: this._generateId(),
|
||||
id: generateId(this.byId),
|
||||
parentId: this.leafId || null,
|
||||
timestamp: new Date().toISOString(),
|
||||
thinkingLevel,
|
||||
|
|
@ -514,7 +543,7 @@ export class SessionManager {
|
|||
saveModelChange(provider: string, modelId: string): string {
|
||||
const entry: ModelChangeEntry = {
|
||||
type: "model_change",
|
||||
id: this._generateId(),
|
||||
id: generateId(this.byId),
|
||||
parentId: this.leafId || null,
|
||||
timestamp: new Date().toISOString(),
|
||||
provider,
|
||||
|
|
@ -527,7 +556,7 @@ export class SessionManager {
|
|||
saveCompaction(summary: string, firstKeptEntryId: string, tokensBefore: number): string {
|
||||
const entry: CompactionEntry = {
|
||||
type: "compaction",
|
||||
id: this._generateId(),
|
||||
id: generateId(this.byId),
|
||||
parentId: this.leafId || null,
|
||||
timestamp: new Date().toISOString(),
|
||||
summary,
|
||||
|
|
@ -546,13 +575,13 @@ export class SessionManager {
|
|||
return this.leafId;
|
||||
}
|
||||
|
||||
getEntry(id: string): ConversationEntry | undefined {
|
||||
getEntry(id: string): SessionEntry | undefined {
|
||||
return this.byId.get(id);
|
||||
}
|
||||
|
||||
/** Walk from entry to root, returning path (conversation entries only) */
|
||||
getPath(fromId?: string): ConversationEntry[] {
|
||||
const path: ConversationEntry[] = [];
|
||||
getPath(fromId?: string): SessionEntry[] {
|
||||
const path: SessionEntry[] = [];
|
||||
let current = this.byId.get(fromId ?? this.leafId);
|
||||
while (current) {
|
||||
path.unshift(current);
|
||||
|
|
@ -566,7 +595,7 @@ export class SessionManager {
|
|||
* Uses tree traversal from current leaf.
|
||||
*/
|
||||
buildSessionContext(): SessionContext {
|
||||
return buildSessionContext(this.getEntries(), this.leafId);
|
||||
return buildSessionContext(this.getEntries(), this.leafId, this.byId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -605,7 +634,7 @@ export class SessionManager {
|
|||
this.leafId = branchFromId;
|
||||
const entry: BranchSummaryEntry = {
|
||||
type: "branch_summary",
|
||||
id: this._generateId(),
|
||||
id: generateId(this.byId),
|
||||
parentId: branchFromId,
|
||||
timestamp: new Date().toISOString(),
|
||||
summary,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue