diff --git a/packages/mom/CHANGELOG.md b/packages/mom/CHANGELOG.md index 914d4b55..03cc3e92 100644 --- a/packages/mom/CHANGELOG.md +++ b/packages/mom/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Fixed + +- Use coding-agent's SessionManager instead of custom MomSessionManager to fix API mismatch crash ([#595](https://github.com/badlogic/pi-mono/issues/595)) + ## [0.42.4] - 2026-01-10 ## [0.42.3] - 2026-01-10 diff --git a/packages/mom/src/agent.ts b/packages/mom/src/agent.ts index 2be0bf3b..fc222fb9 100644 --- a/packages/mom/src/agent.ts +++ b/packages/mom/src/agent.ts @@ -7,13 +7,14 @@ import { formatSkillsForPrompt, loadSkillsFromDir, ModelRegistry, + SessionManager, type Skill, } from "@mariozechner/pi-coding-agent"; import { existsSync, readFileSync } from "fs"; import { mkdir, writeFile } from "fs/promises"; import { homedir } from "os"; import { join } from "path"; -import { MomSessionManager, MomSettingsManager } from "./context.js"; +import { MomSettingsManager, syncLogToSessionManager } from "./context.js"; import * as log from "./log.js"; import { createExecutor, type SandboxConfig } from "./sandbox.js"; import type { ChannelInfo, SlackContext, UserInfo } from "./slack.js"; @@ -418,7 +419,9 @@ function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDi const systemPrompt = buildSystemPrompt(workspacePath, channelId, memory, sandboxConfig, [], [], skills); // Create session manager and settings manager - const sessionManager = new MomSessionManager(channelDir); + // Use a fixed context.jsonl file per channel (not timestamped like coding-agent) + const contextFile = join(channelDir, "context.jsonl"); + const sessionManager = SessionManager.open(contextFile, channelDir); const settingsManager = new MomSettingsManager(join(channelDir, "..")); // Create AuthStorage and ModelRegistry @@ -439,7 +442,7 @@ function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDi }); // Load existing messages - const loadedSession = sessionManager.buildSessionContex(); + const loadedSession = sessionManager.buildSessionContext(); if (loadedSession.messages.length > 0) { agent.replaceMessages(loadedSession.messages); log.logInfo(`[${channelId}] Loaded ${loadedSession.messages.length} messages from context.jsonl`); @@ -448,7 +451,7 @@ function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDi // Create AgentSession wrapper const session = new AgentSession({ agent, - sessionManager: sessionManager as any, + sessionManager, settingsManager: settingsManager as any, modelRegistry, }); @@ -624,9 +627,16 @@ function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDi // Ensure channel directory exists await mkdir(channelDir, { recursive: true }); + // Sync messages from log.jsonl that arrived while we were offline or busy + // Exclude the current message (it will be added via prompt()) + const syncedCount = syncLogToSessionManager(sessionManager, channelDir, ctx.message.ts); + if (syncedCount > 0) { + log.logInfo(`[${channelId}] Synced ${syncedCount} messages from log.jsonl`); + } + // Reload messages from context.jsonl - // This picks up any messages synced from log.jsonl before this run - const reloadedSession = sessionManager.buildSessionContex(); + // This picks up any messages synced above + const reloadedSession = sessionManager.buildSessionContext(); if (reloadedSession.messages.length > 0) { agent.replaceMessages(reloadedSession.messages); log.logInfo(`[${channelId}] Reloaded ${reloadedSession.messages.length} messages from context`); diff --git a/packages/mom/src/context.ts b/packages/mom/src/context.ts index f5a106da..e2c6ca1b 100644 --- a/packages/mom/src/context.ts +++ b/packages/mom/src/context.ts @@ -6,363 +6,139 @@ * - log.jsonl: Human-readable channel history for grep (no tool results) * * This module provides: - * - MomSessionManager: Adapts coding-agent's SessionManager for channel-based storage + * - syncLogToSessionManager: Syncs messages from log.jsonl to SessionManager * - MomSettingsManager: Simple settings for mom (compaction, retry, model preferences) */ -import type { AgentMessage } from "@mariozechner/pi-agent-core"; -import { - buildSessionContext, - type CompactionEntry, - type FileEntry, - type ModelChangeEntry, - type SessionContext, - type SessionEntry, - type SessionEntryBase, - type SessionMessageEntry, - type ThinkingLevelChangeEntry, -} from "@mariozechner/pi-coding-agent"; -import { randomBytes } from "crypto"; -import { appendFileSync, existsSync, mkdirSync, readFileSync, writeFileSync } from "fs"; +import type { UserMessage } from "@mariozechner/pi-ai"; +import type { SessionManager, SessionMessageEntry } from "@mariozechner/pi-coding-agent"; +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "fs"; import { dirname, join } from "path"; -function uuidv4(): string { - const bytes = randomBytes(16); - bytes[6] = (bytes[6] & 0x0f) | 0x40; - bytes[8] = (bytes[8] & 0x3f) | 0x80; - const hex = bytes.toString("hex"); - return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20, 32)}`; +// ============================================================================ +// Sync log.jsonl to SessionManager +// ============================================================================ + +interface LogMessage { + date?: string; + ts?: string; + user?: string; + userName?: string; + text?: string; + isBot?: boolean; } -// ============================================================================ -// MomSessionManager - Channel-based session management -// ============================================================================ - /** - * Session manager for mom, storing context per Slack channel. + * Sync user messages from log.jsonl to SessionManager. * - * Unlike coding-agent which creates timestamped session files, mom uses - * a single context.jsonl per channel that persists across all @mentions. + * This ensures that messages logged while mom wasn't running (channel chatter, + * backfilled messages, messages while busy) are added to the LLM context. + * + * @param sessionManager - The SessionManager to sync to + * @param channelDir - Path to channel directory containing log.jsonl + * @param excludeSlackTs - Slack timestamp of current message (will be added via prompt(), not sync) + * @returns Number of messages synced */ -export class MomSessionManager { - private sessionId: string; - private contextFile: string; - private logFile: string; - private channelDir: string; - private flushed: boolean = false; - private inMemoryEntries: FileEntry[] = []; - private leafId: string | null = null; +export function syncLogToSessionManager( + sessionManager: SessionManager, + channelDir: string, + excludeSlackTs?: string, +): number { + const logFile = join(channelDir, "log.jsonl"); - constructor(channelDir: string) { - this.channelDir = channelDir; - this.contextFile = join(channelDir, "context.jsonl"); - this.logFile = join(channelDir, "log.jsonl"); + if (!existsSync(logFile)) return 0; - // Ensure channel directory exists - if (!existsSync(channelDir)) { - mkdirSync(channelDir, { recursive: true }); - } - - // Load existing session or create new - if (existsSync(this.contextFile)) { - this.inMemoryEntries = this.loadEntriesFromFile(); - this.sessionId = this.extractSessionId() || uuidv4(); - this._updateLeafId(); - this.flushed = true; - } else { - this.sessionId = uuidv4(); - this.inMemoryEntries = [ - { - type: "session", - version: 2, - id: this.sessionId, - timestamp: new Date().toISOString(), - cwd: this.channelDir, - }, - ]; - } - // Note: syncFromLog() is called explicitly from agent.ts with excludeTimestamp - } - - private _updateLeafId(): void { - for (let i = this.inMemoryEntries.length - 1; i >= 0; i--) { - const entry = this.inMemoryEntries[i]; - if (entry.type !== "session") { - this.leafId = entry.id; - return; - } - } - this.leafId = null; - } - - private _createEntryBase(): Omit { - const id = uuidv4(); - const base = { - id, - parentId: this.leafId, - timestamp: new Date().toISOString(), - }; - this.leafId = id; - return base; - } - - private _persist(entry: SessionEntry): void { - const hasAssistant = this.inMemoryEntries.some((e) => e.type === "message" && e.message.role === "assistant"); - if (!hasAssistant) return; - - if (!this.flushed) { - for (const e of this.inMemoryEntries) { - appendFileSync(this.contextFile, `${JSON.stringify(e)}\n`); - } - this.flushed = true; - } else { - appendFileSync(this.contextFile, `${JSON.stringify(entry)}\n`); - } - } - - /** - * Sync user messages from log.jsonl that aren't in context.jsonl. - * - * log.jsonl and context.jsonl must have the same user messages. - * This handles: - * - Backfilled messages (mom was offline) - * - Messages that arrived while mom was processing a previous turn - * - Channel chatter between @mentions - * - * Channel chatter is formatted as "[username]: message" to distinguish from direct @mentions. - * - * Called before each agent run. - * - * @param excludeSlackTs Slack timestamp of current message (will be added via prompt(), not sync) - */ - syncFromLog(excludeSlackTs?: string): void { - if (!existsSync(this.logFile)) return; - - // Build set of Slack timestamps already in context - // We store slackTs in the message content or can extract from formatted messages - // For messages synced from log, we use the log's date as the entry timestamp - // For messages added via prompt(), they have different timestamps - // So we need to match by content OR by stored slackTs - const contextSlackTimestamps = new Set(); - const contextMessageTexts = new Set(); - - for (const entry of this.inMemoryEntries) { - if (entry.type === "message") { - const msgEntry = entry as SessionMessageEntry; - // Store the entry timestamp (which is the log date for synced messages) - contextSlackTimestamps.add(entry.timestamp); - - // Also store message text to catch duplicates added via prompt() - // AgentMessage has different shapes, check for content property - const msg = msgEntry.message as { role: string; content?: unknown }; - if (msg.role === "user" && msg.content !== undefined) { - const content = msg.content; - if (typeof content === "string") { - contextMessageTexts.add(content); - } else if (Array.isArray(content)) { - for (const part of content) { - if ( - typeof part === "object" && - part !== null && - "type" in part && - part.type === "text" && - "text" in part - ) { - contextMessageTexts.add((part as { type: "text"; text: string }).text); + // Build set of existing message content from session + const existingMessages = new Set(); + for (const entry of sessionManager.getEntries()) { + if (entry.type === "message") { + const msgEntry = entry as SessionMessageEntry; + const msg = msgEntry.message as { role: string; content?: unknown }; + if (msg.role === "user" && msg.content !== undefined) { + const content = msg.content; + if (typeof content === "string") { + // Strip timestamp prefix for comparison (live messages have it, synced don't) + // Format: [YYYY-MM-DD HH:MM:SS+HH:MM] [username]: text + let normalized = content.replace(/^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}\] /, ""); + // Strip attachments section + const attachmentsIdx = normalized.indexOf("\n\n\n"); + if (attachmentsIdx !== -1) { + normalized = normalized.substring(0, attachmentsIdx); + } + existingMessages.add(normalized); + } else if (Array.isArray(content)) { + for (const part of content) { + if ( + typeof part === "object" && + part !== null && + "type" in part && + part.type === "text" && + "text" in part + ) { + let normalized = (part as { type: "text"; text: string }).text; + normalized = normalized.replace(/^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}\] /, ""); + const attachmentsIdx = normalized.indexOf("\n\n\n"); + if (attachmentsIdx !== -1) { + normalized = normalized.substring(0, attachmentsIdx); } + existingMessages.add(normalized); } } } } } + } - // Read log.jsonl and find user messages not in context - const logContent = readFileSync(this.logFile, "utf-8"); - const logLines = logContent.trim().split("\n").filter(Boolean); + // Read log.jsonl and find user messages not in context + const logContent = readFileSync(logFile, "utf-8"); + const logLines = logContent.trim().split("\n").filter(Boolean); - interface LogMessage { - date?: string; - ts?: string; - user?: string; - userName?: string; - text?: string; - isBot?: boolean; - } + const newMessages: Array<{ timestamp: number; message: UserMessage }> = []; - const newMessages: Array<{ timestamp: string; slackTs: string; message: AgentMessage }> = []; + for (const line of logLines) { + try { + const logMsg: LogMessage = JSON.parse(line); - for (const line of logLines) { - try { - const logMsg: LogMessage = JSON.parse(line); + const slackTs = logMsg.ts; + const date = logMsg.date; + if (!slackTs || !date) continue; - const slackTs = logMsg.ts; - const date = logMsg.date; - if (!slackTs || !date) continue; + // Skip the current message being processed (will be added via prompt()) + if (excludeSlackTs && slackTs === excludeSlackTs) continue; - // Skip the current message being processed (will be added via prompt()) - if (excludeSlackTs && slackTs === excludeSlackTs) continue; + // Skip bot messages - added through agent flow + if (logMsg.isBot) continue; - // Skip bot messages - added through agent flow - if (logMsg.isBot) continue; + // Build the message text as it would appear in context + const messageText = `[${logMsg.userName || logMsg.user || "unknown"}]: ${logMsg.text || ""}`; - // Skip if this date is already in context (was synced before) - if (contextSlackTimestamps.has(date)) continue; + // Skip if this exact message text is already in context + if (existingMessages.has(messageText)) continue; - // Build the message text as it would appear in context - const messageText = `[${logMsg.userName || logMsg.user || "unknown"}]: ${logMsg.text || ""}`; - - // Skip if this exact message text is already in context (added via prompt()) - if (contextMessageTexts.has(messageText)) continue; - - const msgTime = new Date(date).getTime() || Date.now(); - const userMessage: AgentMessage = { - role: "user", - content: messageText, - timestamp: msgTime, - }; - - newMessages.push({ timestamp: date, slackTs, message: userMessage }); - } catch { - // Skip malformed lines - } - } - - if (newMessages.length === 0) return; - - // Sort by timestamp and add to context - newMessages.sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime()); - - for (const { timestamp, message } of newMessages) { - const id = uuidv4(); - const entry: SessionMessageEntry = { - type: "message", - id, - parentId: this.leafId, - timestamp, // Use log date as entry timestamp for consistent deduplication - message, + const msgTime = new Date(date).getTime() || Date.now(); + const userMessage: UserMessage = { + role: "user", + content: [{ type: "text", text: messageText }], + timestamp: msgTime, }; - this.leafId = id; - this.inMemoryEntries.push(entry); - appendFileSync(this.contextFile, `${JSON.stringify(entry)}\n`); + newMessages.push({ timestamp: msgTime, message: userMessage }); + existingMessages.add(messageText); // Track to avoid duplicates within this sync + } catch { + // Skip malformed lines } } - private extractSessionId(): string | null { - for (const entry of this.inMemoryEntries) { - if (entry.type === "session") { - return entry.id; - } - } - return null; + if (newMessages.length === 0) return 0; + + // Sort by timestamp and add to session + newMessages.sort((a, b) => a.timestamp - b.timestamp); + + for (const { message } of newMessages) { + sessionManager.appendMessage(message); } - private loadEntriesFromFile(): FileEntry[] { - if (!existsSync(this.contextFile)) return []; - - const content = readFileSync(this.contextFile, "utf8"); - const entries: FileEntry[] = []; - const lines = content.trim().split("\n"); - - for (const line of lines) { - if (!line.trim()) continue; - try { - const entry = JSON.parse(line) as FileEntry; - entries.push(entry); - } catch { - // Skip malformed lines - } - } - - return entries; - } - - saveMessage(message: AgentMessage): void { - const entry: SessionMessageEntry = { ...this._createEntryBase(), type: "message", message }; - this.inMemoryEntries.push(entry); - this._persist(entry); - } - - saveThinkingLevelChange(thinkingLevel: string): void { - const entry: ThinkingLevelChangeEntry = { - ...this._createEntryBase(), - type: "thinking_level_change", - thinkingLevel, - }; - this.inMemoryEntries.push(entry); - this._persist(entry); - } - - saveModelChange(provider: string, modelId: string): void { - const entry: ModelChangeEntry = { ...this._createEntryBase(), type: "model_change", provider, modelId }; - this.inMemoryEntries.push(entry); - this._persist(entry); - } - - saveCompaction(entry: CompactionEntry): void { - this.inMemoryEntries.push(entry); - this._persist(entry); - } - - /** Load session with compaction support */ - buildSessionContex(): SessionContext { - const entries = this.loadEntries(); - return buildSessionContext(entries); - } - - loadEntries(): SessionEntry[] { - // Re-read from file to get latest state - const entries = existsSync(this.contextFile) ? this.loadEntriesFromFile() : this.inMemoryEntries; - return entries.filter((e): e is SessionEntry => e.type !== "session"); - } - - getSessionId(): string { - return this.sessionId; - } - - getSessionFile(): string { - return this.contextFile; - } - - /** Reset session (clears context.jsonl) */ - reset(): void { - this.sessionId = uuidv4(); - this.flushed = false; - this.inMemoryEntries = [ - { - type: "session", - id: this.sessionId, - timestamp: new Date().toISOString(), - cwd: this.channelDir, - }, - ]; - // Truncate the context file - if (existsSync(this.contextFile)) { - writeFileSync(this.contextFile, ""); - } - } - - // Compatibility methods for AgentSession - isPersisted(): boolean { - return true; - } - - setSessionFile(_path: string): void { - // No-op for mom - we always use the channel's context.jsonl - } - - loadModel(): { provider: string; modelId: string } | null { - return this.buildSessionContex().model; - } - - loadThinkingLevel(): string { - return this.buildSessionContex().thinkingLevel; - } - - /** Not used by mom but required by AgentSession interface */ - createBranchedSession(_leafId: string): string | null { - return null; // Mom doesn't support branching - } + return newMessages.length; } // ============================================================================ @@ -519,127 +295,3 @@ export class MomSettingsManager { return 30000; } } - -// ============================================================================ -// Sync log.jsonl to context.jsonl -// ============================================================================ - -/** - * Sync user messages from log.jsonl to context.jsonl. - * - * This ensures that messages logged while mom wasn't running (channel chatter, - * backfilled messages, messages while busy) are added to the LLM context. - * - * @param channelDir - Path to channel directory - * @param excludeAfterTs - Don't sync messages with ts >= this value (they'll be handled by agent) - * @returns Number of messages synced - */ -export function syncLogToContext(channelDir: string, excludeAfterTs?: string): number { - const logFile = join(channelDir, "log.jsonl"); - const contextFile = join(channelDir, "context.jsonl"); - - if (!existsSync(logFile)) return 0; - - // Read all user messages from log.jsonl - const logContent = readFileSync(logFile, "utf-8"); - const logLines = logContent.trim().split("\n").filter(Boolean); - - interface LogEntry { - ts: string; - user: string; - userName?: string; - text: string; - isBot: boolean; - } - - const logMessages: LogEntry[] = []; - for (const line of logLines) { - try { - const entry = JSON.parse(line) as LogEntry; - // Only sync user messages (not bot responses) - if (!entry.isBot && entry.ts && entry.text) { - // Skip if >= excludeAfterTs - if (excludeAfterTs && entry.ts >= excludeAfterTs) continue; - logMessages.push(entry); - } - } catch {} - } - - if (logMessages.length === 0) return 0; - - // Read existing timestamps from context.jsonl - if (existsSync(contextFile)) { - const contextContent = readFileSync(contextFile, "utf-8"); - const contextLines = contextContent.trim().split("\n").filter(Boolean); - for (const line of contextLines) { - try { - const entry = JSON.parse(line); - if (entry.type === "message" && entry.message?.role === "user" && entry.message?.timestamp) { - // Extract ts from timestamp (ms -> slack ts format for comparison) - // We store the original slack ts in a way we can recover - // Actually, let's just check by content match since ts formats differ - } - } catch {} - } - } - - // For deduplication, we need to track what's already in context - // Read context and extract user message content (strip attachments section for comparison) - const existingMessages = new Set(); - if (existsSync(contextFile)) { - const contextContent = readFileSync(contextFile, "utf-8"); - const contextLines = contextContent.trim().split("\n").filter(Boolean); - for (const line of contextLines) { - try { - const entry = JSON.parse(line); - if (entry.type === "message" && entry.message?.role === "user") { - let content = - typeof entry.message.content === "string" ? entry.message.content : entry.message.content?.[0]?.text; - if (content) { - // Strip timestamp prefix for comparison (live messages have it, log messages don't) - // Format: [YYYY-MM-DD HH:MM:SS+HH:MM] [username]: text - content = content.replace(/^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}\] /, ""); - // Strip attachments section for comparison (live messages have it, log messages don't) - const attachmentsIdx = content.indexOf("\n\n\n"); - if (attachmentsIdx !== -1) { - content = content.substring(0, attachmentsIdx); - } - existingMessages.add(content); - } - } - } catch {} - } - } - - // Add missing messages to context.jsonl - let syncedCount = 0; - for (const msg of logMessages) { - const userName = msg.userName || msg.user; - const content = `[${userName}]: ${msg.text}`; - - // Skip if already in context - if (existingMessages.has(content)) continue; - - const timestamp = Math.floor(parseFloat(msg.ts) * 1000); - const entry = { - type: "message", - timestamp: new Date(timestamp).toISOString(), - message: { - role: "user", - content, - timestamp, - }, - }; - - // Ensure directory exists - if (!existsSync(channelDir)) { - mkdirSync(channelDir, { recursive: true }); - } - - appendFileSync(contextFile, `${JSON.stringify(entry)}\n`); - existingMessages.add(content); // Track to avoid duplicates within this sync - syncedCount++; - } - - return syncedCount; -} diff --git a/packages/mom/src/main.ts b/packages/mom/src/main.ts index 292d31e3..06f8a852 100644 --- a/packages/mom/src/main.ts +++ b/packages/mom/src/main.ts @@ -2,7 +2,6 @@ import { join, resolve } from "path"; import { type AgentRunner, getOrCreateRunner } from "./agent.js"; -import { syncLogToContext } from "./context.js"; import { downloadChannel } from "./download.js"; import { createEventsWatcher } from "./events.js"; import * as log from "./log.js"; @@ -254,7 +253,6 @@ const handler: MomHandler = { async handleEvent(event: SlackEvent, slack: SlackBot, isEvent?: boolean): Promise { const state = getState(event.channel); - const channelDir = join(workingDir, event.channel); // Start run state.running = true; @@ -263,14 +261,6 @@ const handler: MomHandler = { log.logInfo(`[${event.channel}] Starting run: ${event.text.substring(0, 50)}`); try { - // SYNC context from log.jsonl BEFORE processing - // This adds any messages that were logged while mom wasn't running - // Exclude messages >= current ts (will be handled by agent) - const syncedCount = syncLogToContext(channelDir, event.ts); - if (syncedCount > 0) { - log.logInfo(`[${event.channel}] Synced ${syncedCount} messages from log to context`); - } - // Create context adapter const ctx = createSlackContext(event, slack, state, isEvent);