mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-15 10:05:14 +00:00
fix(mom): use coding-agent SessionManager instead of custom MomSessionManager
Removes MomSessionManager which was missing methods expected by AgentSession (appendMessage, getBranch, etc.), causing runtime crashes. Now uses the standard SessionManager from coding-agent with a fixed context.jsonl path per channel. The syncLogToSessionManager function handles syncing messages from log.jsonl using SessionManager's public API. Ref #595
This commit is contained in:
parent
6db2d0770a
commit
f0fd0a7d6a
4 changed files with 119 additions and 463 deletions
|
|
@ -2,6 +2,10 @@
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Fixed
|
||||
|
||||
- Use coding-agent's SessionManager instead of custom MomSessionManager to fix API mismatch crash ([#595](https://github.com/badlogic/pi-mono/issues/595))
|
||||
|
||||
## [0.42.4] - 2026-01-10
|
||||
|
||||
## [0.42.3] - 2026-01-10
|
||||
|
|
|
|||
|
|
@ -7,13 +7,14 @@ import {
|
|||
formatSkillsForPrompt,
|
||||
loadSkillsFromDir,
|
||||
ModelRegistry,
|
||||
SessionManager,
|
||||
type Skill,
|
||||
} from "@mariozechner/pi-coding-agent";
|
||||
import { existsSync, readFileSync } from "fs";
|
||||
import { mkdir, writeFile } from "fs/promises";
|
||||
import { homedir } from "os";
|
||||
import { join } from "path";
|
||||
import { MomSessionManager, MomSettingsManager } from "./context.js";
|
||||
import { MomSettingsManager, syncLogToSessionManager } from "./context.js";
|
||||
import * as log from "./log.js";
|
||||
import { createExecutor, type SandboxConfig } from "./sandbox.js";
|
||||
import type { ChannelInfo, SlackContext, UserInfo } from "./slack.js";
|
||||
|
|
@ -418,7 +419,9 @@ function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDi
|
|||
const systemPrompt = buildSystemPrompt(workspacePath, channelId, memory, sandboxConfig, [], [], skills);
|
||||
|
||||
// Create session manager and settings manager
|
||||
const sessionManager = new MomSessionManager(channelDir);
|
||||
// Use a fixed context.jsonl file per channel (not timestamped like coding-agent)
|
||||
const contextFile = join(channelDir, "context.jsonl");
|
||||
const sessionManager = SessionManager.open(contextFile, channelDir);
|
||||
const settingsManager = new MomSettingsManager(join(channelDir, ".."));
|
||||
|
||||
// Create AuthStorage and ModelRegistry
|
||||
|
|
@ -439,7 +442,7 @@ function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDi
|
|||
});
|
||||
|
||||
// Load existing messages
|
||||
const loadedSession = sessionManager.buildSessionContex();
|
||||
const loadedSession = sessionManager.buildSessionContext();
|
||||
if (loadedSession.messages.length > 0) {
|
||||
agent.replaceMessages(loadedSession.messages);
|
||||
log.logInfo(`[${channelId}] Loaded ${loadedSession.messages.length} messages from context.jsonl`);
|
||||
|
|
@ -448,7 +451,7 @@ function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDi
|
|||
// Create AgentSession wrapper
|
||||
const session = new AgentSession({
|
||||
agent,
|
||||
sessionManager: sessionManager as any,
|
||||
sessionManager,
|
||||
settingsManager: settingsManager as any,
|
||||
modelRegistry,
|
||||
});
|
||||
|
|
@ -624,9 +627,16 @@ function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDi
|
|||
// Ensure channel directory exists
|
||||
await mkdir(channelDir, { recursive: true });
|
||||
|
||||
// Sync messages from log.jsonl that arrived while we were offline or busy
|
||||
// Exclude the current message (it will be added via prompt())
|
||||
const syncedCount = syncLogToSessionManager(sessionManager, channelDir, ctx.message.ts);
|
||||
if (syncedCount > 0) {
|
||||
log.logInfo(`[${channelId}] Synced ${syncedCount} messages from log.jsonl`);
|
||||
}
|
||||
|
||||
// Reload messages from context.jsonl
|
||||
// This picks up any messages synced from log.jsonl before this run
|
||||
const reloadedSession = sessionManager.buildSessionContex();
|
||||
// This picks up any messages synced above
|
||||
const reloadedSession = sessionManager.buildSessionContext();
|
||||
if (reloadedSession.messages.length > 0) {
|
||||
agent.replaceMessages(reloadedSession.messages);
|
||||
log.logInfo(`[${channelId}] Reloaded ${reloadedSession.messages.length} messages from context`);
|
||||
|
|
|
|||
|
|
@ -6,363 +6,139 @@
|
|||
* - log.jsonl: Human-readable channel history for grep (no tool results)
|
||||
*
|
||||
* This module provides:
|
||||
* - MomSessionManager: Adapts coding-agent's SessionManager for channel-based storage
|
||||
* - syncLogToSessionManager: Syncs messages from log.jsonl to SessionManager
|
||||
* - MomSettingsManager: Simple settings for mom (compaction, retry, model preferences)
|
||||
*/
|
||||
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import {
|
||||
buildSessionContext,
|
||||
type CompactionEntry,
|
||||
type FileEntry,
|
||||
type ModelChangeEntry,
|
||||
type SessionContext,
|
||||
type SessionEntry,
|
||||
type SessionEntryBase,
|
||||
type SessionMessageEntry,
|
||||
type ThinkingLevelChangeEntry,
|
||||
} from "@mariozechner/pi-coding-agent";
|
||||
import { randomBytes } from "crypto";
|
||||
import { appendFileSync, existsSync, mkdirSync, readFileSync, writeFileSync } from "fs";
|
||||
import type { UserMessage } from "@mariozechner/pi-ai";
|
||||
import type { SessionManager, SessionMessageEntry } from "@mariozechner/pi-coding-agent";
|
||||
import { existsSync, mkdirSync, readFileSync, writeFileSync } from "fs";
|
||||
import { dirname, join } from "path";
|
||||
|
||||
function uuidv4(): string {
|
||||
const bytes = randomBytes(16);
|
||||
bytes[6] = (bytes[6] & 0x0f) | 0x40;
|
||||
bytes[8] = (bytes[8] & 0x3f) | 0x80;
|
||||
const hex = bytes.toString("hex");
|
||||
return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20, 32)}`;
|
||||
// ============================================================================
|
||||
// Sync log.jsonl to SessionManager
|
||||
// ============================================================================
|
||||
|
||||
interface LogMessage {
|
||||
date?: string;
|
||||
ts?: string;
|
||||
user?: string;
|
||||
userName?: string;
|
||||
text?: string;
|
||||
isBot?: boolean;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// MomSessionManager - Channel-based session management
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Session manager for mom, storing context per Slack channel.
|
||||
* Sync user messages from log.jsonl to SessionManager.
|
||||
*
|
||||
* Unlike coding-agent which creates timestamped session files, mom uses
|
||||
* a single context.jsonl per channel that persists across all @mentions.
|
||||
* This ensures that messages logged while mom wasn't running (channel chatter,
|
||||
* backfilled messages, messages while busy) are added to the LLM context.
|
||||
*
|
||||
* @param sessionManager - The SessionManager to sync to
|
||||
* @param channelDir - Path to channel directory containing log.jsonl
|
||||
* @param excludeSlackTs - Slack timestamp of current message (will be added via prompt(), not sync)
|
||||
* @returns Number of messages synced
|
||||
*/
|
||||
export class MomSessionManager {
|
||||
private sessionId: string;
|
||||
private contextFile: string;
|
||||
private logFile: string;
|
||||
private channelDir: string;
|
||||
private flushed: boolean = false;
|
||||
private inMemoryEntries: FileEntry[] = [];
|
||||
private leafId: string | null = null;
|
||||
export function syncLogToSessionManager(
|
||||
sessionManager: SessionManager,
|
||||
channelDir: string,
|
||||
excludeSlackTs?: string,
|
||||
): number {
|
||||
const logFile = join(channelDir, "log.jsonl");
|
||||
|
||||
constructor(channelDir: string) {
|
||||
this.channelDir = channelDir;
|
||||
this.contextFile = join(channelDir, "context.jsonl");
|
||||
this.logFile = join(channelDir, "log.jsonl");
|
||||
if (!existsSync(logFile)) return 0;
|
||||
|
||||
// Ensure channel directory exists
|
||||
if (!existsSync(channelDir)) {
|
||||
mkdirSync(channelDir, { recursive: true });
|
||||
}
|
||||
|
||||
// Load existing session or create new
|
||||
if (existsSync(this.contextFile)) {
|
||||
this.inMemoryEntries = this.loadEntriesFromFile();
|
||||
this.sessionId = this.extractSessionId() || uuidv4();
|
||||
this._updateLeafId();
|
||||
this.flushed = true;
|
||||
} else {
|
||||
this.sessionId = uuidv4();
|
||||
this.inMemoryEntries = [
|
||||
{
|
||||
type: "session",
|
||||
version: 2,
|
||||
id: this.sessionId,
|
||||
timestamp: new Date().toISOString(),
|
||||
cwd: this.channelDir,
|
||||
},
|
||||
];
|
||||
}
|
||||
// Note: syncFromLog() is called explicitly from agent.ts with excludeTimestamp
|
||||
}
|
||||
|
||||
private _updateLeafId(): void {
|
||||
for (let i = this.inMemoryEntries.length - 1; i >= 0; i--) {
|
||||
const entry = this.inMemoryEntries[i];
|
||||
if (entry.type !== "session") {
|
||||
this.leafId = entry.id;
|
||||
return;
|
||||
}
|
||||
}
|
||||
this.leafId = null;
|
||||
}
|
||||
|
||||
private _createEntryBase(): Omit<SessionEntryBase, "type"> {
|
||||
const id = uuidv4();
|
||||
const base = {
|
||||
id,
|
||||
parentId: this.leafId,
|
||||
timestamp: new Date().toISOString(),
|
||||
};
|
||||
this.leafId = id;
|
||||
return base;
|
||||
}
|
||||
|
||||
private _persist(entry: SessionEntry): void {
|
||||
const hasAssistant = this.inMemoryEntries.some((e) => e.type === "message" && e.message.role === "assistant");
|
||||
if (!hasAssistant) return;
|
||||
|
||||
if (!this.flushed) {
|
||||
for (const e of this.inMemoryEntries) {
|
||||
appendFileSync(this.contextFile, `${JSON.stringify(e)}\n`);
|
||||
}
|
||||
this.flushed = true;
|
||||
} else {
|
||||
appendFileSync(this.contextFile, `${JSON.stringify(entry)}\n`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync user messages from log.jsonl that aren't in context.jsonl.
|
||||
*
|
||||
* log.jsonl and context.jsonl must have the same user messages.
|
||||
* This handles:
|
||||
* - Backfilled messages (mom was offline)
|
||||
* - Messages that arrived while mom was processing a previous turn
|
||||
* - Channel chatter between @mentions
|
||||
*
|
||||
* Channel chatter is formatted as "[username]: message" to distinguish from direct @mentions.
|
||||
*
|
||||
* Called before each agent run.
|
||||
*
|
||||
* @param excludeSlackTs Slack timestamp of current message (will be added via prompt(), not sync)
|
||||
*/
|
||||
syncFromLog(excludeSlackTs?: string): void {
|
||||
if (!existsSync(this.logFile)) return;
|
||||
|
||||
// Build set of Slack timestamps already in context
|
||||
// We store slackTs in the message content or can extract from formatted messages
|
||||
// For messages synced from log, we use the log's date as the entry timestamp
|
||||
// For messages added via prompt(), they have different timestamps
|
||||
// So we need to match by content OR by stored slackTs
|
||||
const contextSlackTimestamps = new Set<string>();
|
||||
const contextMessageTexts = new Set<string>();
|
||||
|
||||
for (const entry of this.inMemoryEntries) {
|
||||
if (entry.type === "message") {
|
||||
const msgEntry = entry as SessionMessageEntry;
|
||||
// Store the entry timestamp (which is the log date for synced messages)
|
||||
contextSlackTimestamps.add(entry.timestamp);
|
||||
|
||||
// Also store message text to catch duplicates added via prompt()
|
||||
// AgentMessage has different shapes, check for content property
|
||||
const msg = msgEntry.message as { role: string; content?: unknown };
|
||||
if (msg.role === "user" && msg.content !== undefined) {
|
||||
const content = msg.content;
|
||||
if (typeof content === "string") {
|
||||
contextMessageTexts.add(content);
|
||||
} else if (Array.isArray(content)) {
|
||||
for (const part of content) {
|
||||
if (
|
||||
typeof part === "object" &&
|
||||
part !== null &&
|
||||
"type" in part &&
|
||||
part.type === "text" &&
|
||||
"text" in part
|
||||
) {
|
||||
contextMessageTexts.add((part as { type: "text"; text: string }).text);
|
||||
// Build set of existing message content from session
|
||||
const existingMessages = new Set<string>();
|
||||
for (const entry of sessionManager.getEntries()) {
|
||||
if (entry.type === "message") {
|
||||
const msgEntry = entry as SessionMessageEntry;
|
||||
const msg = msgEntry.message as { role: string; content?: unknown };
|
||||
if (msg.role === "user" && msg.content !== undefined) {
|
||||
const content = msg.content;
|
||||
if (typeof content === "string") {
|
||||
// Strip timestamp prefix for comparison (live messages have it, synced don't)
|
||||
// Format: [YYYY-MM-DD HH:MM:SS+HH:MM] [username]: text
|
||||
let normalized = content.replace(/^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}\] /, "");
|
||||
// Strip attachments section
|
||||
const attachmentsIdx = normalized.indexOf("\n\n<slack_attachments>\n");
|
||||
if (attachmentsIdx !== -1) {
|
||||
normalized = normalized.substring(0, attachmentsIdx);
|
||||
}
|
||||
existingMessages.add(normalized);
|
||||
} else if (Array.isArray(content)) {
|
||||
for (const part of content) {
|
||||
if (
|
||||
typeof part === "object" &&
|
||||
part !== null &&
|
||||
"type" in part &&
|
||||
part.type === "text" &&
|
||||
"text" in part
|
||||
) {
|
||||
let normalized = (part as { type: "text"; text: string }).text;
|
||||
normalized = normalized.replace(/^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}\] /, "");
|
||||
const attachmentsIdx = normalized.indexOf("\n\n<slack_attachments>\n");
|
||||
if (attachmentsIdx !== -1) {
|
||||
normalized = normalized.substring(0, attachmentsIdx);
|
||||
}
|
||||
existingMessages.add(normalized);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Read log.jsonl and find user messages not in context
|
||||
const logContent = readFileSync(this.logFile, "utf-8");
|
||||
const logLines = logContent.trim().split("\n").filter(Boolean);
|
||||
// Read log.jsonl and find user messages not in context
|
||||
const logContent = readFileSync(logFile, "utf-8");
|
||||
const logLines = logContent.trim().split("\n").filter(Boolean);
|
||||
|
||||
interface LogMessage {
|
||||
date?: string;
|
||||
ts?: string;
|
||||
user?: string;
|
||||
userName?: string;
|
||||
text?: string;
|
||||
isBot?: boolean;
|
||||
}
|
||||
const newMessages: Array<{ timestamp: number; message: UserMessage }> = [];
|
||||
|
||||
const newMessages: Array<{ timestamp: string; slackTs: string; message: AgentMessage }> = [];
|
||||
for (const line of logLines) {
|
||||
try {
|
||||
const logMsg: LogMessage = JSON.parse(line);
|
||||
|
||||
for (const line of logLines) {
|
||||
try {
|
||||
const logMsg: LogMessage = JSON.parse(line);
|
||||
const slackTs = logMsg.ts;
|
||||
const date = logMsg.date;
|
||||
if (!slackTs || !date) continue;
|
||||
|
||||
const slackTs = logMsg.ts;
|
||||
const date = logMsg.date;
|
||||
if (!slackTs || !date) continue;
|
||||
// Skip the current message being processed (will be added via prompt())
|
||||
if (excludeSlackTs && slackTs === excludeSlackTs) continue;
|
||||
|
||||
// Skip the current message being processed (will be added via prompt())
|
||||
if (excludeSlackTs && slackTs === excludeSlackTs) continue;
|
||||
// Skip bot messages - added through agent flow
|
||||
if (logMsg.isBot) continue;
|
||||
|
||||
// Skip bot messages - added through agent flow
|
||||
if (logMsg.isBot) continue;
|
||||
// Build the message text as it would appear in context
|
||||
const messageText = `[${logMsg.userName || logMsg.user || "unknown"}]: ${logMsg.text || ""}`;
|
||||
|
||||
// Skip if this date is already in context (was synced before)
|
||||
if (contextSlackTimestamps.has(date)) continue;
|
||||
// Skip if this exact message text is already in context
|
||||
if (existingMessages.has(messageText)) continue;
|
||||
|
||||
// Build the message text as it would appear in context
|
||||
const messageText = `[${logMsg.userName || logMsg.user || "unknown"}]: ${logMsg.text || ""}`;
|
||||
|
||||
// Skip if this exact message text is already in context (added via prompt())
|
||||
if (contextMessageTexts.has(messageText)) continue;
|
||||
|
||||
const msgTime = new Date(date).getTime() || Date.now();
|
||||
const userMessage: AgentMessage = {
|
||||
role: "user",
|
||||
content: messageText,
|
||||
timestamp: msgTime,
|
||||
};
|
||||
|
||||
newMessages.push({ timestamp: date, slackTs, message: userMessage });
|
||||
} catch {
|
||||
// Skip malformed lines
|
||||
}
|
||||
}
|
||||
|
||||
if (newMessages.length === 0) return;
|
||||
|
||||
// Sort by timestamp and add to context
|
||||
newMessages.sort((a, b) => new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime());
|
||||
|
||||
for (const { timestamp, message } of newMessages) {
|
||||
const id = uuidv4();
|
||||
const entry: SessionMessageEntry = {
|
||||
type: "message",
|
||||
id,
|
||||
parentId: this.leafId,
|
||||
timestamp, // Use log date as entry timestamp for consistent deduplication
|
||||
message,
|
||||
const msgTime = new Date(date).getTime() || Date.now();
|
||||
const userMessage: UserMessage = {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: messageText }],
|
||||
timestamp: msgTime,
|
||||
};
|
||||
this.leafId = id;
|
||||
|
||||
this.inMemoryEntries.push(entry);
|
||||
appendFileSync(this.contextFile, `${JSON.stringify(entry)}\n`);
|
||||
newMessages.push({ timestamp: msgTime, message: userMessage });
|
||||
existingMessages.add(messageText); // Track to avoid duplicates within this sync
|
||||
} catch {
|
||||
// Skip malformed lines
|
||||
}
|
||||
}
|
||||
|
||||
private extractSessionId(): string | null {
|
||||
for (const entry of this.inMemoryEntries) {
|
||||
if (entry.type === "session") {
|
||||
return entry.id;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
if (newMessages.length === 0) return 0;
|
||||
|
||||
// Sort by timestamp and add to session
|
||||
newMessages.sort((a, b) => a.timestamp - b.timestamp);
|
||||
|
||||
for (const { message } of newMessages) {
|
||||
sessionManager.appendMessage(message);
|
||||
}
|
||||
|
||||
private loadEntriesFromFile(): FileEntry[] {
|
||||
if (!existsSync(this.contextFile)) return [];
|
||||
|
||||
const content = readFileSync(this.contextFile, "utf8");
|
||||
const entries: FileEntry[] = [];
|
||||
const lines = content.trim().split("\n");
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
try {
|
||||
const entry = JSON.parse(line) as FileEntry;
|
||||
entries.push(entry);
|
||||
} catch {
|
||||
// Skip malformed lines
|
||||
}
|
||||
}
|
||||
|
||||
return entries;
|
||||
}
|
||||
|
||||
saveMessage(message: AgentMessage): void {
|
||||
const entry: SessionMessageEntry = { ...this._createEntryBase(), type: "message", message };
|
||||
this.inMemoryEntries.push(entry);
|
||||
this._persist(entry);
|
||||
}
|
||||
|
||||
saveThinkingLevelChange(thinkingLevel: string): void {
|
||||
const entry: ThinkingLevelChangeEntry = {
|
||||
...this._createEntryBase(),
|
||||
type: "thinking_level_change",
|
||||
thinkingLevel,
|
||||
};
|
||||
this.inMemoryEntries.push(entry);
|
||||
this._persist(entry);
|
||||
}
|
||||
|
||||
saveModelChange(provider: string, modelId: string): void {
|
||||
const entry: ModelChangeEntry = { ...this._createEntryBase(), type: "model_change", provider, modelId };
|
||||
this.inMemoryEntries.push(entry);
|
||||
this._persist(entry);
|
||||
}
|
||||
|
||||
saveCompaction(entry: CompactionEntry): void {
|
||||
this.inMemoryEntries.push(entry);
|
||||
this._persist(entry);
|
||||
}
|
||||
|
||||
/** Load session with compaction support */
|
||||
buildSessionContex(): SessionContext {
|
||||
const entries = this.loadEntries();
|
||||
return buildSessionContext(entries);
|
||||
}
|
||||
|
||||
loadEntries(): SessionEntry[] {
|
||||
// Re-read from file to get latest state
|
||||
const entries = existsSync(this.contextFile) ? this.loadEntriesFromFile() : this.inMemoryEntries;
|
||||
return entries.filter((e): e is SessionEntry => e.type !== "session");
|
||||
}
|
||||
|
||||
getSessionId(): string {
|
||||
return this.sessionId;
|
||||
}
|
||||
|
||||
getSessionFile(): string {
|
||||
return this.contextFile;
|
||||
}
|
||||
|
||||
/** Reset session (clears context.jsonl) */
|
||||
reset(): void {
|
||||
this.sessionId = uuidv4();
|
||||
this.flushed = false;
|
||||
this.inMemoryEntries = [
|
||||
{
|
||||
type: "session",
|
||||
id: this.sessionId,
|
||||
timestamp: new Date().toISOString(),
|
||||
cwd: this.channelDir,
|
||||
},
|
||||
];
|
||||
// Truncate the context file
|
||||
if (existsSync(this.contextFile)) {
|
||||
writeFileSync(this.contextFile, "");
|
||||
}
|
||||
}
|
||||
|
||||
// Compatibility methods for AgentSession
|
||||
isPersisted(): boolean {
|
||||
return true;
|
||||
}
|
||||
|
||||
setSessionFile(_path: string): void {
|
||||
// No-op for mom - we always use the channel's context.jsonl
|
||||
}
|
||||
|
||||
loadModel(): { provider: string; modelId: string } | null {
|
||||
return this.buildSessionContex().model;
|
||||
}
|
||||
|
||||
loadThinkingLevel(): string {
|
||||
return this.buildSessionContex().thinkingLevel;
|
||||
}
|
||||
|
||||
/** Not used by mom but required by AgentSession interface */
|
||||
createBranchedSession(_leafId: string): string | null {
|
||||
return null; // Mom doesn't support branching
|
||||
}
|
||||
return newMessages.length;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
|
|
@ -519,127 +295,3 @@ export class MomSettingsManager {
|
|||
return 30000;
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Sync log.jsonl to context.jsonl
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Sync user messages from log.jsonl to context.jsonl.
|
||||
*
|
||||
* This ensures that messages logged while mom wasn't running (channel chatter,
|
||||
* backfilled messages, messages while busy) are added to the LLM context.
|
||||
*
|
||||
* @param channelDir - Path to channel directory
|
||||
* @param excludeAfterTs - Don't sync messages with ts >= this value (they'll be handled by agent)
|
||||
* @returns Number of messages synced
|
||||
*/
|
||||
export function syncLogToContext(channelDir: string, excludeAfterTs?: string): number {
|
||||
const logFile = join(channelDir, "log.jsonl");
|
||||
const contextFile = join(channelDir, "context.jsonl");
|
||||
|
||||
if (!existsSync(logFile)) return 0;
|
||||
|
||||
// Read all user messages from log.jsonl
|
||||
const logContent = readFileSync(logFile, "utf-8");
|
||||
const logLines = logContent.trim().split("\n").filter(Boolean);
|
||||
|
||||
interface LogEntry {
|
||||
ts: string;
|
||||
user: string;
|
||||
userName?: string;
|
||||
text: string;
|
||||
isBot: boolean;
|
||||
}
|
||||
|
||||
const logMessages: LogEntry[] = [];
|
||||
for (const line of logLines) {
|
||||
try {
|
||||
const entry = JSON.parse(line) as LogEntry;
|
||||
// Only sync user messages (not bot responses)
|
||||
if (!entry.isBot && entry.ts && entry.text) {
|
||||
// Skip if >= excludeAfterTs
|
||||
if (excludeAfterTs && entry.ts >= excludeAfterTs) continue;
|
||||
logMessages.push(entry);
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
|
||||
if (logMessages.length === 0) return 0;
|
||||
|
||||
// Read existing timestamps from context.jsonl
|
||||
if (existsSync(contextFile)) {
|
||||
const contextContent = readFileSync(contextFile, "utf-8");
|
||||
const contextLines = contextContent.trim().split("\n").filter(Boolean);
|
||||
for (const line of contextLines) {
|
||||
try {
|
||||
const entry = JSON.parse(line);
|
||||
if (entry.type === "message" && entry.message?.role === "user" && entry.message?.timestamp) {
|
||||
// Extract ts from timestamp (ms -> slack ts format for comparison)
|
||||
// We store the original slack ts in a way we can recover
|
||||
// Actually, let's just check by content match since ts formats differ
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
|
||||
// For deduplication, we need to track what's already in context
|
||||
// Read context and extract user message content (strip attachments section for comparison)
|
||||
const existingMessages = new Set<string>();
|
||||
if (existsSync(contextFile)) {
|
||||
const contextContent = readFileSync(contextFile, "utf-8");
|
||||
const contextLines = contextContent.trim().split("\n").filter(Boolean);
|
||||
for (const line of contextLines) {
|
||||
try {
|
||||
const entry = JSON.parse(line);
|
||||
if (entry.type === "message" && entry.message?.role === "user") {
|
||||
let content =
|
||||
typeof entry.message.content === "string" ? entry.message.content : entry.message.content?.[0]?.text;
|
||||
if (content) {
|
||||
// Strip timestamp prefix for comparison (live messages have it, log messages don't)
|
||||
// Format: [YYYY-MM-DD HH:MM:SS+HH:MM] [username]: text
|
||||
content = content.replace(/^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}\] /, "");
|
||||
// Strip attachments section for comparison (live messages have it, log messages don't)
|
||||
const attachmentsIdx = content.indexOf("\n\n<slack_attachments>\n");
|
||||
if (attachmentsIdx !== -1) {
|
||||
content = content.substring(0, attachmentsIdx);
|
||||
}
|
||||
existingMessages.add(content);
|
||||
}
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
|
||||
// Add missing messages to context.jsonl
|
||||
let syncedCount = 0;
|
||||
for (const msg of logMessages) {
|
||||
const userName = msg.userName || msg.user;
|
||||
const content = `[${userName}]: ${msg.text}`;
|
||||
|
||||
// Skip if already in context
|
||||
if (existingMessages.has(content)) continue;
|
||||
|
||||
const timestamp = Math.floor(parseFloat(msg.ts) * 1000);
|
||||
const entry = {
|
||||
type: "message",
|
||||
timestamp: new Date(timestamp).toISOString(),
|
||||
message: {
|
||||
role: "user",
|
||||
content,
|
||||
timestamp,
|
||||
},
|
||||
};
|
||||
|
||||
// Ensure directory exists
|
||||
if (!existsSync(channelDir)) {
|
||||
mkdirSync(channelDir, { recursive: true });
|
||||
}
|
||||
|
||||
appendFileSync(contextFile, `${JSON.stringify(entry)}\n`);
|
||||
existingMessages.add(content); // Track to avoid duplicates within this sync
|
||||
syncedCount++;
|
||||
}
|
||||
|
||||
return syncedCount;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
import { join, resolve } from "path";
|
||||
import { type AgentRunner, getOrCreateRunner } from "./agent.js";
|
||||
import { syncLogToContext } from "./context.js";
|
||||
import { downloadChannel } from "./download.js";
|
||||
import { createEventsWatcher } from "./events.js";
|
||||
import * as log from "./log.js";
|
||||
|
|
@ -254,7 +253,6 @@ const handler: MomHandler = {
|
|||
|
||||
async handleEvent(event: SlackEvent, slack: SlackBot, isEvent?: boolean): Promise<void> {
|
||||
const state = getState(event.channel);
|
||||
const channelDir = join(workingDir, event.channel);
|
||||
|
||||
// Start run
|
||||
state.running = true;
|
||||
|
|
@ -263,14 +261,6 @@ const handler: MomHandler = {
|
|||
log.logInfo(`[${event.channel}] Starting run: ${event.text.substring(0, 50)}`);
|
||||
|
||||
try {
|
||||
// SYNC context from log.jsonl BEFORE processing
|
||||
// This adds any messages that were logged while mom wasn't running
|
||||
// Exclude messages >= current ts (will be handled by agent)
|
||||
const syncedCount = syncLogToContext(channelDir, event.ts);
|
||||
if (syncedCount > 0) {
|
||||
log.logInfo(`[${event.channel}] Synced ${syncedCount} messages from log to context`);
|
||||
}
|
||||
|
||||
// Create context adapter
|
||||
const ctx = createSlackContext(event, slack, state, isEvent);
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue