From 99fe4802ef85d32614d3f21ff95f9a01e28255e0 Mon Sep 17 00:00:00 2001 From: Mario Zechner Date: Thu, 11 Dec 2025 20:13:29 +0100 Subject: [PATCH] mom: rewrite message handling - log.jsonl and context.jsonl sync - log.jsonl is source of truth, context.jsonl syncs from it at run start - Backfill fetches missing messages from Slack API on startup - Messages sent while mom is busy are logged and synced on next run - Channel chatter (no @mention) logged but doesn't trigger processing - Pre-startup messages (replayed by Slack) logged but not processed - Stop command executes immediately, not queued - Session header written immediately on new session creation - Deduplicate messages by timestamp - Strip @mentions from backfilled messages - Remove old slack.ts and main.ts, rename *-new.ts versions --- packages/mom/out.html | 66 --- packages/mom/src/agent.ts | 601 ++++++++++++----------- packages/mom/src/context.ts | 214 ++++++++- packages/mom/src/main.ts | 304 ++++++++---- packages/mom/src/slack.ts | 926 +++++++++++++++++------------------- 5 files changed, 1142 insertions(+), 969 deletions(-) delete mode 100644 packages/mom/out.html diff --git a/packages/mom/out.html b/packages/mom/out.html deleted file mode 100644 index eca59864..00000000 --- a/packages/mom/out.html +++ /dev/null @@ -1,66 +0,0 @@ - - - - - - - - - rust programming - Brave Search - - - -
🌐
Rust
rust-lang.org
Rust Programming Language
Rust has great documentation, a friendly compiler with useful error messages, and top-notch tooling β€” an integrated package manager and build tool, smart multi-editor support with auto-completion and type inspections, an auto-formatter, and more.
Learn
A language empowering everyone to build reliable and efficient software.
Tools
A language empowering everyone to build reliable and efficient software.
Community
A language empowering everyone to build reliable and efficient software.
Install
A language empowering everyone to build reliable and efficient software.

memory-safe programming language without garbage collection

Rust is a general-purpose programming language. It is noted for its emphasis on performance, type safety, concurrency, and memory safety. Rust supports multiple programming paradigms. It was influenced by ideas from functional … Wikipedia
Factsheet
Developer The Rust Team
First appeared January 19, 2012; 13 years ago (2012-01-19)
Factsheet
Developer The Rust Team
First appeared January 19, 2012; 13 years ago (2012-01-19)
🌐
Wikipedia
en.wikipedia.org β€Ί wiki β€Ί Rust_(programming_language)
Rust (programming language) - Wikipedia
1 day ago - Rust is a general-purpose programming language. It is noted for its emphasis on performance, type safety, concurrency, and memory safety. Rust supports multiple programming paradigms. It was influenced by ideas from functional programming, including immutability, higher-order functions, algebraic ...
🌐
Rust Programming Language
rust-lang.org β€Ί learn
Learn Rust - Rust Programming Language
Affectionately nicknamed β€œthe book,” The Rust Programming Language will give you an overview of the language from first principles.
🌐
W3Schools
w3schools.com β€Ί rust
Rust Tutorial
Rust is a popular programming language used to build everything from web servers to game engines.
🌐
The Rust Programming Language
doc.rust-lang.org β€Ί book β€Ί ch00-00-introduction.html
Introduction - The Rust Programming Language
Welcome to The Rust Programming Language, an introductory book about Rust. The Rust programming language helps you write faster, more reliable software. High-level ergonomics and low-level control are often at odds in programming language design; Rust challenges that conflict.
🌐
Medium
medium.com β€Ί codex β€Ί rust-101-everything-you-need-to-know-about-rust-f3dd0ae99f4c
Rust 101 β€” Everything you need to know about Rust | by Nishant Aanjaney Jalan | CodeX | Medium
February 25, 2023 - Code should be able to run without crashing first time β€” Yes, you heard me. Rust is a statically typed language, and based on the way it is constructed, it leaves almost (I say, almost!) no room for your program to crash.
🌐
Wikipedia
de.wikipedia.org β€Ί wiki β€Ί Rust_(Programmiersprache)
Rust (Programmiersprache) – Wikipedia
May 16, 2015 - Rust vereint AnsΓ€tze aus verschiedenen Programmierparadigmen, unter anderem aus der funktionalen, der objektorientierten und der nebenlΓ€ufigen Programmierung und erlaubt so ein hohes Abstraktionsniveau. Beispielsweise gibt es in Rust algebraische Datentypen, Pattern Matching, Traits (Γ€hnlich den Typklassen in Haskell), Closures sowie UnterstΓΌtzung fΓΌr RAII.
🌐
Reddit
reddit.com β€Ί r β€Ί rust
The Rust Programming Language
December 2, 2010 - A place for all things related to the Rust programming languageβ€”an open-source systems language that emphasizes performance, reliability, and productivity.
Find elsewhere
🌐
GeeksforGeeks
geeksforgeeks.org β€Ί rust β€Ί introduction-to-rust-programming-language
Introduction to Rust Programming Language - GeeksforGeeks
July 23, 2025 - Rust is using Rust which means that all the standard compiler libraries are written in rust; there is a bit of use of the C programming language but most of it is Rust.
🌐
The Rust Programming Language
doc.rust-lang.org β€Ί book
The Rust Programming Language - The Rust Programming Language
See the β€œInstallation” section of Chapter 1 to install or update Rust.
🌐
Stack Overflow
stackoverflow.blog β€Ί 2020 β€Ί 01 β€Ί 20 β€Ί what-is-rust-and-why-is-it-so-popular
What is Rust and why is it so popular? - Stack Overflow
This means any value may be what it says or nothing, effectively creating a second possible type for every type. Like Haskell and some other modern programming languages, Rust encodes this possibility using an optional type, and the compiler requires you to handle the None case.
🌐
YouTube
youtube.com β€Ί playlist
Rust Programming Tutorial πŸ¦€ - YouTube
Learn Rust Programming in these videos! πŸ¦€ We will cover the fundamental concepts behind the Rust language, such as control flow statements, variables, iterat...
🌐
CodiLime
codilime.com β€Ί blog β€Ί software development β€Ί backend β€Ί rust programming language - what is rust used for and why is so popular? - codilime
Rust programming language - what is rust used for and why is so popular? - CodiLime
Rust is a statically-typed programming language designed for performance and safety, especially safe concurrency and memory management. Its syntax is similar to that of C++. It is an open-source project developed originally at Mozilla Research.
🌐
GitHub
github.com β€Ί rust-lang β€Ί rust
GitHub - rust-lang/rust: Empowering everyone to build reliable and efficient software.
This is the main source code repository for Rust.
Starred by 108K users
Forked by 14.1K users
Languages Β  Rust 89.9% | HTML 7.2% | Shell 0.7% | JavaScript 0.5% | C 0.4% | Python 0.3%
🌐
Zero To Mastery
zerotomastery.io β€Ί home β€Ί courses β€Ί rust programming: the complete developer's guide
Learn Rust Programming: The Complete Developer's Guide | Zero To Mastery
Foundational computer science topics such as computer memory, program logic, and simple data structures Β· Working with data: enums, structs, tuples, expressions, optional data and more Β· Solid understanding of all core concepts of the Rust programming language such as: memory, mutability, traits, slices, and generics
🌐
MIT
web.mit.edu β€Ί rust-lang_v1.25 β€Ί arch β€Ί amd64_ubuntu1404 β€Ί share β€Ί doc β€Ί rust β€Ί html β€Ί book β€Ί first-edition β€Ί getting-started.html
Getting Started - The Rust Programming Language
Now, let’s go over what just happened in your "Hello, world!" program in detail. Here's the first piece of the puzzle: ... These lines define a function in Rust. The main function is special: it's the beginning of every Rust program.
🌐
Codecademy
codecademy.com β€Ί learn β€Ί rust-for-programmers
Rust for Programmers | Codecademy
A quick primer on the fundamentals of the Rust programming language for experienced programmers.
Rating: 3.9 ​ - ​ - 155 - votes
🌐
Programiz
programiz.com β€Ί rust
Learn Rust
It has been voted one of the most loved programming languages for more than five years in StackOverflow's survey. Rust is specifically designed to address the safety issues found in older languages like C and C++, by preventing memory leaks and common errors, making it a secure choice for system programming.
🌐
O'Reilly
oreilly.com β€Ί library β€Ί view β€Ί programming-rust-2nd β€Ί 9781492052586
Programming Rust, 2nd Edition [Book]
The Rust systems programming language combines that control with a modern type system that catches broad classes of common mistakes, from memory management errors to data races between threads.
🌐
GitHub
github.com β€Ί rust-lang
The Rust Programming Language Β· GitHub
The Rust Programming Language has 232 repositories available. Follow their code on GitHub.
- - -
- - - diff --git a/packages/mom/src/agent.ts b/packages/mom/src/agent.ts index e9d25d13..5992e241 100644 --- a/packages/mom/src/agent.ts +++ b/packages/mom/src/agent.ts @@ -2,7 +2,7 @@ import { Agent, type AgentEvent, ProviderTransport } from "@mariozechner/pi-agen import { getModel } from "@mariozechner/pi-ai"; import { AgentSession, messageTransformer } from "@mariozechner/pi-coding-agent"; import { existsSync, readFileSync } from "fs"; -import { mkdir } from "fs/promises"; +import { mkdir, writeFile } from "fs/promises"; import { join } from "path"; import { MomSessionManager, MomSettingsManager } from "./context.js"; import * as log from "./log.js"; @@ -34,8 +34,15 @@ function toSlackTs(): string { return `${seconds}.${micros.toString().padStart(6, "0")}`; } +export interface PendingMessage { + userName: string; + text: string; + attachments: { local: string }[]; + timestamp: number; +} + export interface AgentRunner { - run(ctx: SlackContext, channelDir: string, store: ChannelStore): Promise<{ stopReason: string }>; + run(ctx: SlackContext, store: ChannelStore, pendingMessages?: PendingMessage[]): Promise<{ stopReason: string }>; abort(): void; } @@ -254,21 +261,250 @@ function formatToolArgsForSlack(_toolName: string, args: Record return lines.join("\n"); } -// Cache for AgentSession and SessionManager per channel -const channelSessions = new Map(); +// Cache runners per channel +const channelRunners = new Map(); -export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner { - let currentSession: AgentSession | null = null; +/** + * Get or create an AgentRunner for a channel. + * Runners are cached - one per channel, persistent across messages. + */ +export function getOrCreateRunner(sandboxConfig: SandboxConfig, channelId: string, channelDir: string): AgentRunner { + const existing = channelRunners.get(channelId); + if (existing) return existing; + + const runner = createRunner(sandboxConfig, channelId, channelDir); + channelRunners.set(channelId, runner); + return runner; +} + +/** + * Create a new AgentRunner for a channel. + * Sets up the session and subscribes to events once. + */ +function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDir: string): AgentRunner { const executor = createExecutor(sandboxConfig); + const workspacePath = executor.getWorkspacePath(channelDir.replace(`/${channelId}`, "")); + + // Create tools + const tools = createMomTools(executor); + + // Initial system prompt (will be updated each run with fresh memory/channels/users) + const memory = getMemory(channelDir); + const systemPrompt = buildSystemPrompt(workspacePath, channelId, memory, sandboxConfig, [], []); + + // Create session manager and settings manager + // Pass model info so new sessions get a header written immediately + const sessionManager = new MomSessionManager(channelDir, { + provider: model.provider, + id: model.id, + thinkingLevel: "off", + }); + const settingsManager = new MomSettingsManager(join(channelDir, "..")); + + // Create agent + const agent = new Agent({ + initialState: { + systemPrompt, + model, + thinkingLevel: "off", + tools, + }, + messageTransformer, + transport: new ProviderTransport({ + getApiKey: async () => getAnthropicApiKey(), + }), + }); + + // Load existing messages + const loadedSession = sessionManager.loadSession(); + if (loadedSession.messages.length > 0) { + agent.replaceMessages(loadedSession.messages); + log.logInfo(`[${channelId}] Loaded ${loadedSession.messages.length} messages from context.jsonl`); + } + + // Create AgentSession wrapper + const session = new AgentSession({ + agent, + sessionManager: sessionManager as any, + settingsManager: settingsManager as any, + }); + + // Mutable per-run state - event handler references this + const runState = { + ctx: null as SlackContext | null, + logCtx: null as { channelId: string; userName?: string; channelName?: string } | null, + queue: null as { + enqueue(fn: () => Promise, errorContext: string): void; + enqueueMessage(text: string, target: "main" | "thread", errorContext: string, doLog?: boolean): void; + } | null, + pendingTools: new Map(), + totalUsage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + }; + + // Subscribe to events ONCE + session.subscribe(async (event) => { + // Skip if no active run + if (!runState.ctx || !runState.logCtx || !runState.queue) return; + + const { ctx, logCtx, queue, pendingTools } = runState; + + if (event.type === "tool_execution_start") { + const agentEvent = event as AgentEvent & { type: "tool_execution_start" }; + const args = agentEvent.args as { label?: string }; + const label = args.label || agentEvent.toolName; + + pendingTools.set(agentEvent.toolCallId, { + toolName: agentEvent.toolName, + args: agentEvent.args, + startTime: Date.now(), + }); + + log.logToolStart(logCtx, agentEvent.toolName, label, agentEvent.args as Record); + queue.enqueue(() => ctx.respond(`_β†’ ${label}_`, false), "tool label"); + } else if (event.type === "tool_execution_end") { + const agentEvent = event as AgentEvent & { type: "tool_execution_end" }; + const resultStr = extractToolResultText(agentEvent.result); + const pending = pendingTools.get(agentEvent.toolCallId); + pendingTools.delete(agentEvent.toolCallId); + + const durationMs = pending ? Date.now() - pending.startTime : 0; + + if (agentEvent.isError) { + log.logToolError(logCtx, agentEvent.toolName, durationMs, resultStr); + } else { + log.logToolSuccess(logCtx, agentEvent.toolName, durationMs, resultStr); + } + + // Post args + result to thread + const label = pending?.args ? (pending.args as { label?: string }).label : undefined; + const argsFormatted = pending + ? formatToolArgsForSlack(agentEvent.toolName, pending.args as Record) + : "(args not found)"; + const duration = (durationMs / 1000).toFixed(1); + let threadMessage = `*${agentEvent.isError ? "βœ—" : "βœ“"} ${agentEvent.toolName}*`; + if (label) threadMessage += `: ${label}`; + threadMessage += ` (${duration}s)\n`; + if (argsFormatted) threadMessage += "```\n" + argsFormatted + "\n```\n"; + threadMessage += "*Result:*\n```\n" + resultStr + "\n```"; + + queue.enqueueMessage(threadMessage, "thread", "tool result thread", false); + + if (agentEvent.isError) { + queue.enqueue(() => ctx.respond(`_Error: ${truncate(resultStr, 200)}_`, false), "tool error"); + } + } else if (event.type === "message_start") { + const agentEvent = event as AgentEvent & { type: "message_start" }; + if (agentEvent.message.role === "assistant") { + log.logResponseStart(logCtx); + } + } else if (event.type === "message_end") { + const agentEvent = event as AgentEvent & { type: "message_end" }; + if (agentEvent.message.role === "assistant") { + const assistantMsg = agentEvent.message as any; + + if (assistantMsg.stopReason) { + runState.stopReason = assistantMsg.stopReason; + } + + if (assistantMsg.usage) { + runState.totalUsage.input += assistantMsg.usage.input; + runState.totalUsage.output += assistantMsg.usage.output; + runState.totalUsage.cacheRead += assistantMsg.usage.cacheRead; + runState.totalUsage.cacheWrite += assistantMsg.usage.cacheWrite; + runState.totalUsage.cost.input += assistantMsg.usage.cost.input; + runState.totalUsage.cost.output += assistantMsg.usage.cost.output; + runState.totalUsage.cost.cacheRead += assistantMsg.usage.cost.cacheRead; + runState.totalUsage.cost.cacheWrite += assistantMsg.usage.cost.cacheWrite; + runState.totalUsage.cost.total += assistantMsg.usage.cost.total; + } + + const content = agentEvent.message.content; + const thinkingParts: string[] = []; + const textParts: string[] = []; + for (const part of content) { + if (part.type === "thinking") { + thinkingParts.push((part as any).thinking); + } else if (part.type === "text") { + textParts.push((part as any).text); + } + } + + const text = textParts.join("\n"); + + for (const thinking of thinkingParts) { + log.logThinking(logCtx, thinking); + queue.enqueueMessage(`_${thinking}_`, "main", "thinking main"); + queue.enqueueMessage(`_${thinking}_`, "thread", "thinking thread", false); + } + + if (text.trim()) { + log.logResponse(logCtx, text); + queue.enqueueMessage(text, "main", "response main"); + queue.enqueueMessage(text, "thread", "response thread", false); + } + } + } else if (event.type === "auto_compaction_start") { + log.logInfo(`Auto-compaction started (reason: ${(event as any).reason})`); + queue.enqueue(() => ctx.respond("_Compacting context..._", false), "compaction start"); + } else if (event.type === "auto_compaction_end") { + const compEvent = event as any; + if (compEvent.result) { + log.logInfo(`Auto-compaction complete: ${compEvent.result.tokensBefore} tokens compacted`); + } else if (compEvent.aborted) { + log.logInfo("Auto-compaction aborted"); + } + } else if (event.type === "auto_retry_start") { + const retryEvent = event as any; + log.logWarning(`Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})`, retryEvent.errorMessage); + queue.enqueue( + () => ctx.respond(`_Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})..._`, false), + "retry", + ); + } + }); + + // Slack message limit + const SLACK_MAX_LENGTH = 40000; + const splitForSlack = (text: string): string[] => { + if (text.length <= SLACK_MAX_LENGTH) return [text]; + const parts: string[] = []; + let remaining = text; + let partNum = 1; + while (remaining.length > 0) { + const chunk = remaining.substring(0, SLACK_MAX_LENGTH - 50); + remaining = remaining.substring(SLACK_MAX_LENGTH - 50); + const suffix = remaining.length > 0 ? `\n_(continued ${partNum}...)_` : ""; + parts.push(chunk + suffix); + partNum++; + } + return parts; + }; return { - async run(ctx: SlackContext, channelDir: string, _store: ChannelStore): Promise<{ stopReason: string }> { + async run( + ctx: SlackContext, + _store: ChannelStore, + _pendingMessages?: PendingMessage[], + ): Promise<{ stopReason: string }> { // Ensure channel directory exists await mkdir(channelDir, { recursive: true }); - const channelId = ctx.message.channel; - const workspacePath = executor.getWorkspacePath(channelDir.replace(`/${channelId}`, "")); + // Reload messages from context.jsonl + // This picks up any messages synced from log.jsonl before this run + const reloadedSession = sessionManager.loadSession(); + if (reloadedSession.messages.length > 0) { + agent.replaceMessages(reloadedSession.messages); + log.logInfo(`[${channelId}] Reloaded ${reloadedSession.messages.length} messages from context`); + } + // Update system prompt with fresh memory and channel/user info const memory = getMemory(channelDir); const systemPrompt = buildSystemPrompt( workspacePath, @@ -278,123 +514,36 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner { ctx.channels, ctx.users, ); + session.agent.setSystemPrompt(systemPrompt); - // Debug: log context sizes - log.logInfo(`Context sizes - system: ${systemPrompt.length} chars, memory: ${memory.length} chars`); - log.logInfo(`Channels: ${ctx.channels.length}, Users: ${ctx.users.length}`); - - // Set up file upload function for the attach tool + // Set up file upload function setUploadFunction(async (filePath: string, title?: string) => { const hostPath = translateToHostPath(filePath, channelDir, workspacePath, channelId); await ctx.uploadFile(hostPath, title); }); - // Create tools with executor - const tools = createMomTools(executor); - - // Get or create AgentSession for this channel - const cached = channelSessions.get(channelId); - let session: AgentSession; - let sessionManager: MomSessionManager; - - if (!cached) { - // Create session manager and settings manager - sessionManager = new MomSessionManager(channelDir); - const settingsManager = new MomSettingsManager(join(channelDir, "..")); - - // Create agent with proper message transformer for compaction - const agent = new Agent({ - initialState: { - systemPrompt, - model, - thinkingLevel: "off", - tools, - }, - messageTransformer, - transport: new ProviderTransport({ - getApiKey: async () => getAnthropicApiKey(), - }), - }); - - // Load existing messages from session - const loadedSession = sessionManager.loadSession(); - if (loadedSession.messages.length > 0) { - agent.replaceMessages(loadedSession.messages); - log.logInfo(`Loaded ${loadedSession.messages.length} messages from context.jsonl`); - } - - // Create AgentSession wrapper - session = new AgentSession({ - agent, - sessionManager: sessionManager as any, // Type compatibility - settingsManager: settingsManager as any, // Type compatibility - }); - - channelSessions.set(channelId, { session, sessionManager }); - } else { - session = cached.session; - sessionManager = cached.sessionManager; - - // Update system prompt for existing session (memory may have changed) - session.agent.setSystemPrompt(systemPrompt); - } - - // Sync messages from log.jsonl to context.jsonl - // Exclude the current message - it will be added via prompt() - sessionManager.syncFromLog(ctx.message.ts); - - currentSession = session; - - // Create logging context - const logCtx = { + // Reset per-run state + runState.ctx = ctx; + runState.logCtx = { channelId: ctx.message.channel, userName: ctx.message.userName, channelName: ctx.channelName, }; - - // Track pending tool calls to pair args with results and timing - const pendingTools = new Map(); - - // Track usage across all assistant messages in this run - const totalUsage = { + runState.pendingTools.clear(); + runState.totalUsage = { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, - cost: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - total: 0, - }, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, }; + runState.stopReason = "stop"; - // Track stop reason - let stopReason = "stop"; - - // Slack message limit is 40,000 characters - const SLACK_MAX_LENGTH = 40000; - const splitForSlack = (text: string): string[] => { - if (text.length <= SLACK_MAX_LENGTH) return [text]; - const parts: string[] = []; - let remaining = text; - let partNum = 1; - while (remaining.length > 0) { - const chunk = remaining.substring(0, SLACK_MAX_LENGTH - 50); - remaining = remaining.substring(SLACK_MAX_LENGTH - 50); - const suffix = remaining.length > 0 ? `\n_(continued ${partNum}...)_` : ""; - parts.push(chunk + suffix); - partNum++; - } - return parts; - }; - - // Promise queue to ensure ctx.respond/respondInThread calls execute in order - const queue = { - chain: Promise.resolve(), + // Create queue for this run + let queueChain = Promise.resolve(); + runState.queue = { enqueue(fn: () => Promise, errorContext: string): void { - this.chain = this.chain.then(async () => { + queueChain = queueChain.then(async () => { try { await fn(); } catch (err) { @@ -417,188 +566,74 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner { ); } }, - flush(): Promise { - return this.chain; - }, }; - // Subscribe to session events - const unsubscribe = session.subscribe(async (event) => { - // Handle agent events - if (event.type === "tool_execution_start") { - const agentEvent = event as AgentEvent & { type: "tool_execution_start" }; - const args = agentEvent.args as { label?: string }; - const label = args.label || agentEvent.toolName; + // Log context info + log.logInfo(`Context sizes - system: ${systemPrompt.length} chars, memory: ${memory.length} chars`); + log.logInfo(`Channels: ${ctx.channels.length}, Users: ${ctx.users.length}`); - pendingTools.set(agentEvent.toolCallId, { - toolName: agentEvent.toolName, - args: agentEvent.args, - startTime: Date.now(), - }); + // Build user message with username prefix + // Format: "[username]: message" so LLM knows who's talking + let userMessage = `[${ctx.message.userName || "unknown"}]: ${ctx.message.text}`; - log.logToolStart(logCtx, agentEvent.toolName, label, agentEvent.args as Record); - - // NOTE: Tool results are NOT logged to log.jsonl anymore - // They are stored in context.jsonl via AgentSession - - queue.enqueue(() => ctx.respond(`_β†’ ${label}_`, false), "tool label"); - } else if (event.type === "tool_execution_end") { - const agentEvent = event as AgentEvent & { type: "tool_execution_end" }; - const resultStr = extractToolResultText(agentEvent.result); - const pending = pendingTools.get(agentEvent.toolCallId); - pendingTools.delete(agentEvent.toolCallId); - - const durationMs = pending ? Date.now() - pending.startTime : 0; - - if (agentEvent.isError) { - log.logToolError(logCtx, agentEvent.toolName, durationMs, resultStr); - } else { - log.logToolSuccess(logCtx, agentEvent.toolName, durationMs, resultStr); - } - - // Post args + result to thread (for debugging) - const label = pending?.args ? (pending.args as { label?: string }).label : undefined; - const argsFormatted = pending - ? formatToolArgsForSlack(agentEvent.toolName, pending.args as Record) - : "(args not found)"; - const duration = (durationMs / 1000).toFixed(1); - let threadMessage = `*${agentEvent.isError ? "βœ—" : "βœ“"} ${agentEvent.toolName}*`; - if (label) { - threadMessage += `: ${label}`; - } - threadMessage += ` (${duration}s)\n`; - - if (argsFormatted) { - threadMessage += "```\n" + argsFormatted + "\n```\n"; - } - - threadMessage += "*Result:*\n```\n" + resultStr + "\n```"; - - queue.enqueueMessage(threadMessage, "thread", "tool result thread", false); - - if (agentEvent.isError) { - queue.enqueue(() => ctx.respond(`_Error: ${truncate(resultStr, 200)}_`, false), "tool error"); - } - } else if (event.type === "message_start") { - const agentEvent = event as AgentEvent & { type: "message_start" }; - if (agentEvent.message.role === "assistant") { - log.logResponseStart(logCtx); - } - } else if (event.type === "message_end") { - const agentEvent = event as AgentEvent & { type: "message_end" }; - if (agentEvent.message.role === "assistant") { - const assistantMsg = agentEvent.message as any; - - if (assistantMsg.stopReason) { - stopReason = assistantMsg.stopReason; - } - - if (assistantMsg.usage) { - totalUsage.input += assistantMsg.usage.input; - totalUsage.output += assistantMsg.usage.output; - totalUsage.cacheRead += assistantMsg.usage.cacheRead; - totalUsage.cacheWrite += assistantMsg.usage.cacheWrite; - totalUsage.cost.input += assistantMsg.usage.cost.input; - totalUsage.cost.output += assistantMsg.usage.cost.output; - totalUsage.cost.cacheRead += assistantMsg.usage.cost.cacheRead; - totalUsage.cost.cacheWrite += assistantMsg.usage.cost.cacheWrite; - totalUsage.cost.total += assistantMsg.usage.cost.total; - } - - const content = agentEvent.message.content; - const thinkingParts: string[] = []; - const textParts: string[] = []; - for (const part of content) { - if (part.type === "thinking") { - thinkingParts.push((part as any).thinking); - } else if (part.type === "text") { - textParts.push((part as any).text); - } - } - - const text = textParts.join("\n"); - - for (const thinking of thinkingParts) { - log.logThinking(logCtx, thinking); - queue.enqueueMessage(`_${thinking}_`, "main", "thinking main"); - queue.enqueueMessage(`_${thinking}_`, "thread", "thinking thread", false); - } - - if (text.trim()) { - log.logResponse(logCtx, text); - queue.enqueueMessage(text, "main", "response main"); - queue.enqueueMessage(text, "thread", "response thread", false); - } - } - } else if (event.type === "auto_compaction_start") { - log.logInfo(`Auto-compaction started (reason: ${(event as any).reason})`); - queue.enqueue(() => ctx.respond("_Compacting context..._", false), "compaction start"); - } else if (event.type === "auto_compaction_end") { - const compEvent = event as any; - if (compEvent.result) { - log.logInfo(`Auto-compaction complete: ${compEvent.result.tokensBefore} tokens compacted`); - } else if (compEvent.aborted) { - log.logInfo("Auto-compaction aborted"); - } - } else if (event.type === "auto_retry_start") { - const retryEvent = event as any; - log.logWarning(`Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})`, retryEvent.errorMessage); - queue.enqueue( - () => ctx.respond(`_Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})..._`, false), - "retry", - ); - } - }); - - try { - // Build user message from Slack context - // Note: User message is already logged to log.jsonl by Slack event handler - const userMessage = ctx.message.text; - - // Send prompt to agent session - await session.prompt(userMessage); - - // Wait for all queued Slack messages - await queue.flush(); - - // Get final assistant text and update main message - const messages = session.messages; - const lastAssistant = messages.filter((m) => m.role === "assistant").pop(); - const finalText = - lastAssistant?.content - .filter((c): c is { type: "text"; text: string } => c.type === "text") - .map((c) => c.text) - .join("\n") || ""; - - if (finalText.trim()) { - // Note: Bot response is logged via ctx.respond() in the event handler - try { - const mainText = - finalText.length > SLACK_MAX_LENGTH - ? finalText.substring(0, SLACK_MAX_LENGTH - 50) + "\n\n_(see thread for full response)_" - : finalText; - await ctx.replaceMessage(mainText); - } catch (err) { - const errMsg = err instanceof Error ? err.message : String(err); - log.logWarning("Failed to replace message with final text", errMsg); - } - } - - // Log usage summary - if (totalUsage.cost.total > 0) { - const summary = log.logUsageSummary(logCtx, totalUsage); - queue.enqueue(() => ctx.respondInThread(summary), "usage summary"); - await queue.flush(); - } - - return { stopReason }; - } finally { - unsubscribe(); + // Add attachment paths if any + if (ctx.message.attachments && ctx.message.attachments.length > 0) { + const attachmentPaths = ctx.message.attachments.map((a) => a.local).join("\n"); + userMessage += `\n\nAttachments:\n${attachmentPaths}`; } + + // Debug: write context to last_prompt.jsonl + const debugContext = { + systemPrompt, + messages: session.messages, + newUserMessage: userMessage, + }; + await writeFile(join(channelDir, "last_prompt.jsonl"), JSON.stringify(debugContext, null, 2)); + + await session.prompt(userMessage); + + // Wait for queued messages + await queueChain; + + // Final message update + const messages = session.messages; + const lastAssistant = messages.filter((m) => m.role === "assistant").pop(); + const finalText = + lastAssistant?.content + .filter((c): c is { type: "text"; text: string } => c.type === "text") + .map((c) => c.text) + .join("\n") || ""; + + if (finalText.trim()) { + try { + const mainText = + finalText.length > SLACK_MAX_LENGTH + ? finalText.substring(0, SLACK_MAX_LENGTH - 50) + "\n\n_(see thread for full response)_" + : finalText; + await ctx.replaceMessage(mainText); + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + log.logWarning("Failed to replace message with final text", errMsg); + } + } + + // Log usage summary + if (runState.totalUsage.cost.total > 0) { + const summary = log.logUsageSummary(runState.logCtx!, runState.totalUsage); + runState.queue.enqueue(() => ctx.respondInThread(summary), "usage summary"); + await queueChain; + } + + // Clear run state + runState.ctx = null; + runState.logCtx = null; + runState.queue = null; + + return { stopReason: runState.stopReason }; }, abort(): void { - currentSession?.abort(); + session.abort(); }, }; } diff --git a/packages/mom/src/context.ts b/packages/mom/src/context.ts index 7bba807d..d2f4d62f 100644 --- a/packages/mom/src/context.ts +++ b/packages/mom/src/context.ts @@ -52,7 +52,7 @@ export class MomSessionManager { private inMemoryEntries: SessionEntry[] = []; private pendingEntries: SessionEntry[] = []; - constructor(channelDir: string) { + constructor(channelDir: string, initialModel?: { provider: string; id: string; thinkingLevel?: string }) { this.channelDir = channelDir; this.contextFile = join(channelDir, "context.jsonl"); this.logFile = join(channelDir, "log.jsonl"); @@ -68,11 +68,33 @@ export class MomSessionManager { this.sessionId = this.extractSessionId() || uuidv4(); this.sessionInitialized = this.inMemoryEntries.length > 0; } else { + // New session - write header immediately this.sessionId = uuidv4(); + if (initialModel) { + this.writeSessionHeader(initialModel); + } } // Note: syncFromLog() is called explicitly from agent.ts with excludeTimestamp } + /** Write session header to file (called on new session creation) */ + private writeSessionHeader(model: { provider: string; id: string; thinkingLevel?: string }): void { + this.sessionInitialized = true; + + const entry: SessionHeader = { + type: "session", + id: this.sessionId, + timestamp: new Date().toISOString(), + cwd: this.channelDir, + provider: model.provider, + modelId: model.id, + thinkingLevel: model.thinkingLevel || "off", + }; + + this.inMemoryEntries.push(entry); + appendFileSync(this.contextFile, JSON.stringify(entry) + "\n"); + } + /** * Sync user messages from log.jsonl that aren't in context.jsonl. * @@ -84,18 +106,48 @@ export class MomSessionManager { * * Channel chatter is formatted as "[username]: message" to distinguish from direct @mentions. * - * Called automatically on construction and should be called before each agent run. + * Called before each agent run. * - * @param excludeTimestamp Optional timestamp to exclude (for the current @mention being processed) + * @param excludeSlackTs Slack timestamp of current message (will be added via prompt(), not sync) */ - syncFromLog(excludeTimestamp?: string): void { + syncFromLog(excludeSlackTs?: string): void { if (!existsSync(this.logFile)) return; - // Get timestamps of messages already in context - const contextTimestamps = new Set(); + // Build set of Slack timestamps already in context + // We store slackTs in the message content or can extract from formatted messages + // For messages synced from log, we use the log's date as the entry timestamp + // For messages added via prompt(), they have different timestamps + // So we need to match by content OR by stored slackTs + const contextSlackTimestamps = new Set(); + const contextMessageTexts = new Set(); + for (const entry of this.inMemoryEntries) { if (entry.type === "message") { - contextTimestamps.add(entry.timestamp); + const msgEntry = entry as SessionMessageEntry; + // Store the entry timestamp (which is the log date for synced messages) + contextSlackTimestamps.add(entry.timestamp); + + // Also store message text to catch duplicates added via prompt() + // AppMessage has different shapes, check for content property + const msg = msgEntry.message as { role: string; content?: unknown }; + if (msg.role === "user" && msg.content !== undefined) { + const content = msg.content; + if (typeof content === "string") { + contextMessageTexts.add(content); + } else if (Array.isArray(content)) { + for (const part of content) { + if ( + typeof part === "object" && + part !== null && + "type" in part && + part.type === "text" && + "text" in part + ) { + contextMessageTexts.add((part as { type: "text"; text: string }).text); + } + } + } + } } } @@ -112,34 +164,39 @@ export class MomSessionManager { isBot?: boolean; } - const newMessages: Array<{ timestamp: string; message: AppMessage }> = []; + const newMessages: Array<{ timestamp: string; slackTs: string; message: AppMessage }> = []; for (const line of logLines) { try { const logMsg: LogMessage = JSON.parse(line); - // Use date for context timestamp (consistent key) - const ts = logMsg.date || logMsg.ts; - if (!ts) continue; - - // Skip if already in context - if (contextTimestamps.has(ts)) continue; + const slackTs = logMsg.ts; + const date = logMsg.date; + if (!slackTs || !date) continue; // Skip the current message being processed (will be added via prompt()) - // Compare against Slack ts since that's what ctx.message.ts provides - if (excludeTimestamp && logMsg.ts === excludeTimestamp) continue; + if (excludeSlackTs && slackTs === excludeSlackTs) continue; // Skip bot messages - added through agent flow if (logMsg.isBot) continue; - const msgTime = new Date(ts).getTime() || Date.now(); + // Skip if this date is already in context (was synced before) + if (contextSlackTimestamps.has(date)) continue; + + // Build the message text as it would appear in context + const messageText = `[${logMsg.userName || logMsg.user || "unknown"}]: ${logMsg.text || ""}`; + + // Skip if this exact message text is already in context (added via prompt()) + if (contextMessageTexts.has(messageText)) continue; + + const msgTime = new Date(date).getTime() || Date.now(); const userMessage: AppMessage = { role: "user", - content: `[${logMsg.userName || logMsg.user || "unknown"}]: ${logMsg.text || ""}`, + content: messageText, timestamp: msgTime, }; - newMessages.push({ timestamp: ts, message: userMessage }); + newMessages.push({ timestamp: date, slackTs, message: userMessage }); } catch { // Skip malformed lines } @@ -153,15 +210,13 @@ export class MomSessionManager { for (const { timestamp, message } of newMessages) { const entry: SessionMessageEntry = { type: "message", - timestamp, + timestamp, // Use log date as entry timestamp for consistent deduplication message, }; this.inMemoryEntries.push(entry); appendFileSync(this.contextFile, JSON.stringify(entry) + "\n"); } - - // Sync complete - newMessages.length messages added } private extractSessionId(): string | null { @@ -483,3 +538,118 @@ export class MomSettingsManager { return 30000; } } + +// ============================================================================ +// Sync log.jsonl to context.jsonl +// ============================================================================ + +/** + * Sync user messages from log.jsonl to context.jsonl. + * + * This ensures that messages logged while mom wasn't running (channel chatter, + * backfilled messages, messages while busy) are added to the LLM context. + * + * @param channelDir - Path to channel directory + * @param excludeAfterTs - Don't sync messages with ts >= this value (they'll be handled by agent) + * @returns Number of messages synced + */ +export function syncLogToContext(channelDir: string, excludeAfterTs?: string): number { + const logFile = join(channelDir, "log.jsonl"); + const contextFile = join(channelDir, "context.jsonl"); + + if (!existsSync(logFile)) return 0; + + // Read all user messages from log.jsonl + const logContent = readFileSync(logFile, "utf-8"); + const logLines = logContent.trim().split("\n").filter(Boolean); + + interface LogEntry { + ts: string; + user: string; + userName?: string; + text: string; + isBot: boolean; + } + + const logMessages: LogEntry[] = []; + for (const line of logLines) { + try { + const entry = JSON.parse(line) as LogEntry; + // Only sync user messages (not bot responses) + if (!entry.isBot && entry.ts && entry.text) { + // Skip if >= excludeAfterTs + if (excludeAfterTs && entry.ts >= excludeAfterTs) continue; + logMessages.push(entry); + } + } catch {} + } + + if (logMessages.length === 0) return 0; + + // Read existing timestamps from context.jsonl + const existingTs = new Set(); + if (existsSync(contextFile)) { + const contextContent = readFileSync(contextFile, "utf-8"); + const contextLines = contextContent.trim().split("\n").filter(Boolean); + for (const line of contextLines) { + try { + const entry = JSON.parse(line); + if (entry.type === "message" && entry.message?.role === "user" && entry.message?.timestamp) { + // Extract ts from timestamp (ms -> slack ts format for comparison) + // We store the original slack ts in a way we can recover + // Actually, let's just check by content match since ts formats differ + } + } catch {} + } + } + + // For deduplication, we need to track what's already in context + // Read context and extract user message content + const existingMessages = new Set(); + if (existsSync(contextFile)) { + const contextContent = readFileSync(contextFile, "utf-8"); + const contextLines = contextContent.trim().split("\n").filter(Boolean); + for (const line of contextLines) { + try { + const entry = JSON.parse(line); + if (entry.type === "message" && entry.message?.role === "user") { + const content = + typeof entry.message.content === "string" ? entry.message.content : entry.message.content?.[0]?.text; + if (content) existingMessages.add(content); + } + } catch {} + } + } + + // Add missing messages to context.jsonl + let syncedCount = 0; + for (const msg of logMessages) { + const userName = msg.userName || msg.user; + const content = `[${userName}]: ${msg.text}`; + + // Skip if already in context + if (existingMessages.has(content)) continue; + + const timestamp = Math.floor(parseFloat(msg.ts) * 1000); + const entry = { + type: "message", + timestamp: new Date(timestamp).toISOString(), + message: { + role: "user", + content, + timestamp, + }, + }; + + // Ensure directory exists + if (!existsSync(channelDir)) { + mkdirSync(channelDir, { recursive: true }); + } + + appendFileSync(contextFile, JSON.stringify(entry) + "\n"); + existingMessages.add(content); // Track to avoid duplicates within this sync + syncedCount++; + } + + return syncedCount; +} diff --git a/packages/mom/src/main.ts b/packages/mom/src/main.ts index cc839fe3..d56ce0a8 100644 --- a/packages/mom/src/main.ts +++ b/packages/mom/src/main.ts @@ -1,17 +1,22 @@ #!/usr/bin/env node import { join, resolve } from "path"; -import { type AgentRunner, createAgentRunner } from "./agent.js"; +import { type AgentRunner, getOrCreateRunner } from "./agent.js"; +import { syncLogToContext } from "./context.js"; import * as log from "./log.js"; import { parseSandboxArg, type SandboxConfig, validateSandbox } from "./sandbox.js"; -import { MomBot, type SlackContext } from "./slack.js"; +import { type MomHandler, type SlackBot, SlackBot as SlackBotClass, type SlackEvent } from "./slack.js"; +import { ChannelStore } from "./store.js"; + +// ============================================================================ +// Config +// ============================================================================ const MOM_SLACK_APP_TOKEN = process.env.MOM_SLACK_APP_TOKEN; const MOM_SLACK_BOT_TOKEN = process.env.MOM_SLACK_BOT_TOKEN; const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY; const ANTHROPIC_OAUTH_TOKEN = process.env.ANTHROPIC_OAUTH_TOKEN; -// Parse command line arguments function parseArgs(): { workingDir: string; sandbox: SandboxConfig } { const args = process.argv.slice(2); let sandbox: SandboxConfig = { type: "host" }; @@ -22,30 +27,14 @@ function parseArgs(): { workingDir: string; sandbox: SandboxConfig } { if (arg.startsWith("--sandbox=")) { sandbox = parseSandboxArg(arg.slice("--sandbox=".length)); } else if (arg === "--sandbox") { - const next = args[++i]; - if (!next) { - console.error("Error: --sandbox requires a value (host or docker:)"); - process.exit(1); - } - sandbox = parseSandboxArg(next); + sandbox = parseSandboxArg(args[++i] || ""); } else if (!arg.startsWith("-")) { workingDir = arg; - } else { - console.error(`Unknown option: ${arg}`); - process.exit(1); } } if (!workingDir) { - console.error("Usage: mom [--sandbox=host|docker:] "); - console.error(""); - console.error("Options:"); - console.error(" --sandbox=host Run tools directly on host (default)"); - console.error(" --sandbox=docker: Run tools in Docker container"); - console.error(""); - console.error("Examples:"); - console.error(" mom ./data"); - console.error(" mom --sandbox=docker:mom-sandbox ./data"); + console.error("Usage: mom [--sandbox=host|docker:] "); process.exit(1); } @@ -54,101 +43,210 @@ function parseArgs(): { workingDir: string; sandbox: SandboxConfig } { const { workingDir, sandbox } = parseArgs(); -log.logStartup(workingDir, sandbox.type === "host" ? "host" : `docker:${sandbox.container}`); - if (!MOM_SLACK_APP_TOKEN || !MOM_SLACK_BOT_TOKEN || (!ANTHROPIC_API_KEY && !ANTHROPIC_OAUTH_TOKEN)) { - console.error("Missing required environment variables:"); - if (!MOM_SLACK_APP_TOKEN) console.error(" - MOM_SLACK_APP_TOKEN (xapp-...)"); - if (!MOM_SLACK_BOT_TOKEN) console.error(" - MOM_SLACK_BOT_TOKEN (xoxb-...)"); - if (!ANTHROPIC_API_KEY && !ANTHROPIC_OAUTH_TOKEN) console.error(" - ANTHROPIC_API_KEY or ANTHROPIC_OAUTH_TOKEN"); + console.error("Missing env: MOM_SLACK_APP_TOKEN, MOM_SLACK_BOT_TOKEN, ANTHROPIC_API_KEY or ANTHROPIC_OAUTH_TOKEN"); process.exit(1); } -// Validate sandbox configuration await validateSandbox(sandbox); -// Track active agent runs per channel -const activeRuns = new Map(); +// ============================================================================ +// State (per channel) +// ============================================================================ -async function handleMessage(ctx: SlackContext, _source: "channel" | "dm"): Promise { - const channelId = ctx.message.channel; - const messageText = ctx.message.text.toLowerCase().trim(); - - const logCtx = { - channelId: ctx.message.channel, - userName: ctx.message.userName, - channelName: ctx.channelName, - }; - - // Check for stop command - if (messageText === "stop") { - const active = activeRuns.get(channelId); - if (active) { - log.logStopRequest(logCtx); - // Post a NEW message saying "Stopping..." - await ctx.respond("_Stopping..._"); - // Store this context to update it to "Stopped" later - active.stopContext = ctx; - // Abort the runner - active.runner.abort(); - } else { - await ctx.respond("_Nothing running._"); - } - return; - } - - // Check if already running in this channel - if (activeRuns.has(channelId)) { - await ctx.respond("_Already working on something. Say `@mom stop` to cancel._"); - return; - } - - log.logUserMessage(logCtx, ctx.message.text); - const channelDir = join(workingDir, channelId); - - const runner = createAgentRunner(sandbox); - activeRuns.set(channelId, { runner, context: ctx }); - - await ctx.setTyping(true); - await ctx.setWorking(true); - - const result = await runner.run(ctx, channelDir, ctx.store); - - // Remove working indicator - await ctx.setWorking(false); - - // Handle different stop reasons - const active = activeRuns.get(channelId); - if (result.stopReason === "aborted") { - // Replace the STOP message with "Stopped" - if (active?.stopContext) { - await active.stopContext.setWorking(false); - await active.stopContext.replaceMessage("_Stopped_"); - } - } else if (result.stopReason === "error") { - // Agent encountered an error - log.logAgentError(logCtx, "Agent stopped with error"); - } - // "stop", "length", "toolUse" are normal completions - nothing extra to do - - activeRuns.delete(channelId); +interface ChannelState { + running: boolean; + runner: AgentRunner; + store: ChannelStore; + stopRequested: boolean; + stopMessageTs?: string; } -const bot = new MomBot( - { - async onChannelMention(ctx) { - await handleMessage(ctx, "channel"); +const channelStates = new Map(); + +function getState(channelId: string): ChannelState { + let state = channelStates.get(channelId); + if (!state) { + const channelDir = join(workingDir, channelId); + state = { + running: false, + runner: getOrCreateRunner(sandbox, channelId, channelDir), + store: new ChannelStore({ workingDir, botToken: MOM_SLACK_BOT_TOKEN! }), + stopRequested: false, + }; + channelStates.set(channelId, state); + } + return state; +} + +// ============================================================================ +// Create SlackContext adapter +// ============================================================================ + +function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelState) { + let messageTs: string | null = null; + let accumulatedText = ""; + let isWorking = true; + const workingIndicator = " ..."; + let updatePromise = Promise.resolve(); + + const user = slack.getUser(event.user); + + return { + message: { + text: event.text, + rawText: event.text, + user: event.user, + userName: user?.userName, + channel: event.channel, + ts: event.ts, + attachments: [], + }, + channelName: slack.getChannel(event.channel)?.name, + store: state.store, + channels: slack.getAllChannels().map((c) => ({ id: c.id, name: c.name })), + users: slack.getAllUsers().map((u) => ({ id: u.id, userName: u.userName, displayName: u.displayName })), + + respond: async (text: string, shouldLog = true) => { + updatePromise = updatePromise.then(async () => { + accumulatedText = accumulatedText ? accumulatedText + "\n" + text : text; + const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText; + + if (messageTs) { + await slack.updateMessage(event.channel, messageTs, displayText); + } else { + messageTs = await slack.postMessage(event.channel, displayText); + } + + if (shouldLog && messageTs) { + slack.logBotResponse(event.channel, text, messageTs); + } + }); + await updatePromise; }, - async onDirectMessage(ctx) { - await handleMessage(ctx, "dm"); + replaceMessage: async (text: string) => { + updatePromise = updatePromise.then(async () => { + accumulatedText = text; + const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText; + if (messageTs) { + await slack.updateMessage(event.channel, messageTs, displayText); + } else { + messageTs = await slack.postMessage(event.channel, displayText); + } + }); + await updatePromise; }, + + respondInThread: async (text: string) => { + updatePromise = updatePromise.then(async () => { + if (messageTs) { + await slack.postInThread(event.channel, messageTs, text); + } + }); + await updatePromise; + }, + + setTyping: async (isTyping: boolean) => { + if (isTyping && !messageTs) { + accumulatedText = "_Thinking_"; + messageTs = await slack.postMessage(event.channel, accumulatedText + workingIndicator); + } + }, + + uploadFile: async (filePath: string, title?: string) => { + await slack.uploadFile(event.channel, filePath, title); + }, + + setWorking: async (working: boolean) => { + updatePromise = updatePromise.then(async () => { + isWorking = working; + if (messageTs) { + const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText; + await slack.updateMessage(event.channel, messageTs, displayText); + } + }); + await updatePromise; + }, + }; +} + +// ============================================================================ +// Handler +// ============================================================================ + +const handler: MomHandler = { + isRunning(channelId: string): boolean { + const state = channelStates.get(channelId); + return state?.running ?? false; }, - { - appToken: MOM_SLACK_APP_TOKEN, - botToken: MOM_SLACK_BOT_TOKEN, - workingDir, + + async handleStop(channelId: string, slack: SlackBot): Promise { + const state = channelStates.get(channelId); + if (state?.running) { + state.stopRequested = true; + state.runner.abort(); + const ts = await slack.postMessage(channelId, "_Stopping..._"); + state.stopMessageTs = ts; // Save for updating later + } else { + await slack.postMessage(channelId, "_Nothing running_"); + } }, -); + + async handleEvent(event: SlackEvent, slack: SlackBot): Promise { + const state = getState(event.channel); + const channelDir = join(workingDir, event.channel); + + // Start run + state.running = true; + state.stopRequested = false; + + log.logInfo(`[${event.channel}] Starting run: ${event.text.substring(0, 50)}`); + + try { + // SYNC context from log.jsonl BEFORE processing + // This adds any messages that were logged while mom wasn't running + // Exclude messages >= current ts (will be handled by agent) + const syncedCount = syncLogToContext(channelDir, event.ts); + if (syncedCount > 0) { + log.logInfo(`[${event.channel}] Synced ${syncedCount} messages from log to context`); + } + + // Create context adapter + const ctx = createSlackContext(event, slack, state); + + // Run the agent + await ctx.setTyping(true); + await ctx.setWorking(true); + const result = await state.runner.run(ctx as any, state.store); + await ctx.setWorking(false); + + if (result.stopReason === "aborted" && state.stopRequested) { + if (state.stopMessageTs) { + await slack.updateMessage(event.channel, state.stopMessageTs, "_Stopped_"); + state.stopMessageTs = undefined; + } else { + await slack.postMessage(event.channel, "_Stopped_"); + } + } + } catch (err) { + log.logWarning(`[${event.channel}] Run error`, err instanceof Error ? err.message : String(err)); + } finally { + state.running = false; + } + }, +}; + +// ============================================================================ +// Start +// ============================================================================ + +log.logStartup(workingDir, sandbox.type === "host" ? "host" : `docker:${sandbox.container}`); + +const bot = new SlackBotClass(handler, { + appToken: MOM_SLACK_APP_TOKEN, + botToken: MOM_SLACK_BOT_TOKEN, + workingDir, +}); bot.start(); diff --git a/packages/mom/src/slack.ts b/packages/mom/src/slack.ts index aa24e667..c36747cf 100644 --- a/packages/mom/src/slack.ts +++ b/packages/mom/src/slack.ts @@ -1,53 +1,34 @@ import { SocketModeClient } from "@slack/socket-mode"; -import { type ConversationsHistoryResponse, WebClient } from "@slack/web-api"; -import { readFileSync } from "fs"; -import { basename } from "path"; +import { WebClient } from "@slack/web-api"; +import { appendFileSync, existsSync, mkdirSync, readFileSync } from "fs"; +import { basename, join } from "path"; import * as log from "./log.js"; -import { type Attachment, ChannelStore } from "./store.js"; -export interface SlackMessage { - text: string; // message content (mentions stripped) - rawText: string; // original text with mentions - user: string; // user ID - userName?: string; // user handle - channel: string; // channel ID - ts: string; // timestamp (for threading) - attachments: Attachment[]; // file attachments +// ============================================================================ +// Types +// ============================================================================ + +export interface SlackEvent { + type: "mention" | "dm"; + channel: string; + ts: string; + user: string; + text: string; + files?: Array<{ name: string; url_private_download?: string; url_private?: string }>; } -export interface SlackContext { - message: SlackMessage; - channelName?: string; // channel name for logging (e.g., #dev-team) - store: ChannelStore; - /** All channels the bot is a member of */ - channels: ChannelInfo[]; - /** All known users in the workspace */ - users: UserInfo[]; - /** Send/update the main message (accumulates text). Set log=false to skip logging. */ - respond(text: string, shouldLog?: boolean): Promise; - /** Replace the entire message text (not append) */ - replaceMessage(text: string): Promise; - /** Post a message in the thread under the main message (for verbose details) */ - respondInThread(text: string): Promise; - /** Show/hide typing indicator */ - setTyping(isTyping: boolean): Promise; - /** Upload a file to the channel */ - uploadFile(filePath: string, title?: string): Promise; - /** Set working state (adds/removes working indicator emoji) */ - setWorking(working: boolean): Promise; +export interface SlackUser { + id: string; + userName: string; + displayName: string; } -export interface MomHandler { - onChannelMention(ctx: SlackContext): Promise; - onDirectMessage(ctx: SlackContext): Promise; -} - -export interface MomBotConfig { - appToken: string; - botToken: string; - workingDir: string; // directory for channel data and attachments +export interface SlackChannel { + id: string; + name: string; } +// Types used by agent.ts export interface ChannelInfo { id: string; name: string; @@ -59,159 +40,201 @@ export interface UserInfo { displayName: string; } -export class MomBot { +export interface SlackContext { + message: { + text: string; + rawText: string; + user: string; + userName?: string; + channel: string; + ts: string; + attachments: Array<{ local: string }>; + }; + channelName?: string; + channels: ChannelInfo[]; + users: UserInfo[]; + respond: (text: string, shouldLog?: boolean) => Promise; + replaceMessage: (text: string) => Promise; + respondInThread: (text: string) => Promise; + setTyping: (isTyping: boolean) => Promise; + uploadFile: (filePath: string, title?: string) => Promise; + setWorking: (working: boolean) => Promise; +} + +export interface MomHandler { + /** + * Check if channel is currently running (SYNC) + */ + isRunning(channelId: string): boolean; + + /** + * Handle an event that triggers mom (ASYNC) + * Called only when isRunning() returned false + */ + handleEvent(event: SlackEvent, slack: SlackBot): Promise; + + /** + * Handle stop command (ASYNC) + * Called when user says "stop" while mom is running + */ + handleStop(channelId: string, slack: SlackBot): Promise; +} + +// ============================================================================ +// Per-channel queue for sequential processing +// ============================================================================ + +type QueuedWork = () => Promise; + +class ChannelQueue { + private queue: QueuedWork[] = []; + private processing = false; + + enqueue(work: QueuedWork): void { + this.queue.push(work); + this.processNext(); + } + + private async processNext(): Promise { + if (this.processing || this.queue.length === 0) return; + this.processing = true; + const work = this.queue.shift()!; + try { + await work(); + } catch (err) { + log.logWarning("Queue error", err instanceof Error ? err.message : String(err)); + } + this.processing = false; + this.processNext(); + } +} + +// ============================================================================ +// SlackBot +// ============================================================================ + +export class SlackBot { private socketClient: SocketModeClient; private webClient: WebClient; private handler: MomHandler; + private workingDir: string; private botUserId: string | null = null; - public readonly store: ChannelStore; - private userCache: Map = new Map(); - private channelCache: Map = new Map(); // id -> name + private startupTs: string | null = null; // Messages older than this are just logged, not processed - constructor(handler: MomHandler, config: MomBotConfig) { + private users = new Map(); + private channels = new Map(); + private queues = new Map(); + + constructor(handler: MomHandler, config: { appToken: string; botToken: string; workingDir: string }) { this.handler = handler; + this.workingDir = config.workingDir; this.socketClient = new SocketModeClient({ appToken: config.appToken }); this.webClient = new WebClient(config.botToken); - this.store = new ChannelStore({ - workingDir: config.workingDir, - botToken: config.botToken, - }); + } + + // ========================================================================== + // Public API + // ========================================================================== + + async start(): Promise { + const auth = await this.webClient.auth.test(); + this.botUserId = auth.user_id as string; + + await Promise.all([this.fetchUsers(), this.fetchChannels()]); + log.logInfo(`Loaded ${this.channels.size} channels, ${this.users.size} users`); + + await this.backfillAllChannels(); this.setupEventHandlers(); + await this.socketClient.start(); + + // Record startup time - messages older than this are just logged, not processed + this.startupTs = (Date.now() / 1000).toFixed(6); + + log.logConnected(); } - /** - * Fetch all channels the bot is a member of - */ - private async fetchChannels(): Promise { - try { - let cursor: string | undefined; - do { - const result = await this.webClient.conversations.list({ - types: "public_channel,private_channel", - exclude_archived: true, - limit: 200, - cursor, - }); - - const channels = result.channels as Array<{ id?: string; name?: string; is_member?: boolean }> | undefined; - if (channels) { - for (const channel of channels) { - if (channel.id && channel.name && channel.is_member) { - this.channelCache.set(channel.id, channel.name); - } - } - } - - cursor = result.response_metadata?.next_cursor; - } while (cursor); - } catch (error) { - log.logWarning("Failed to fetch channels", String(error)); - } + getUser(userId: string): SlackUser | undefined { + return this.users.get(userId); } - /** - * Fetch all workspace users - */ - private async fetchUsers(): Promise { - try { - let cursor: string | undefined; - do { - const result = await this.webClient.users.list({ - limit: 200, - cursor, - }); - - const members = result.members as - | Array<{ id?: string; name?: string; real_name?: string; deleted?: boolean }> - | undefined; - if (members) { - for (const user of members) { - if (user.id && user.name && !user.deleted) { - this.userCache.set(user.id, { - userName: user.name, - displayName: user.real_name || user.name, - }); - } - } - } - - cursor = result.response_metadata?.next_cursor; - } while (cursor); - } catch (error) { - log.logWarning("Failed to fetch users", String(error)); - } + getChannel(channelId: string): SlackChannel | undefined { + return this.channels.get(channelId); } - /** - * Get all known channels (id -> name) - */ - getChannels(): ChannelInfo[] { - return Array.from(this.channelCache.entries()).map(([id, name]) => ({ id, name })); + getAllUsers(): SlackUser[] { + return Array.from(this.users.values()); } - /** - * Get all known users - */ - getUsers(): UserInfo[] { - return Array.from(this.userCache.entries()).map(([id, { userName, displayName }]) => ({ - id, - userName, - displayName, - })); + getAllChannels(): SlackChannel[] { + return Array.from(this.channels.values()); } - /** - * Obfuscate usernames and user IDs in text to prevent pinging people - * e.g., "nate" -> "n_a_t_e", "@mario" -> "@m_a_r_i_o", "<@U123>" -> "<@U_1_2_3>" - */ - private obfuscateUsernames(text: string): string { - let result = text; + async postMessage(channel: string, text: string): Promise { + const result = await this.webClient.chat.postMessage({ channel, text }); + return result.ts as string; + } - // Obfuscate user IDs like <@U16LAL8LS> - result = result.replace(/<@([A-Z0-9]+)>/gi, (_match, id) => { - return `<@${id.split("").join("_")}>`; + async updateMessage(channel: string, ts: string, text: string): Promise { + await this.webClient.chat.update({ channel, ts, text }); + } + + async postInThread(channel: string, threadTs: string, text: string): Promise { + await this.webClient.chat.postMessage({ channel, thread_ts: threadTs, text }); + } + + async uploadFile(channel: string, filePath: string, title?: string): Promise { + const fileName = title || basename(filePath); + const fileContent = readFileSync(filePath); + await this.webClient.files.uploadV2({ + channel_id: channel, + file: fileContent, + filename: fileName, + title: fileName, }); - - // Obfuscate usernames - for (const { userName } of this.userCache.values()) { - // Escape special regex characters in username - const escaped = userName.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); - // Match @username, <@username>, or bare username (case insensitive, word boundary) - const pattern = new RegExp(`(<@|@)?(\\b${escaped}\\b)`, "gi"); - result = result.replace(pattern, (_match, prefix, name) => { - const obfuscated = name.split("").join("_"); - return (prefix || "") + obfuscated; - }); - } - return result; } - private async getUserInfo(userId: string): Promise<{ userName: string; displayName: string }> { - if (this.userCache.has(userId)) { - return this.userCache.get(userId)!; - } + /** + * Log a message to log.jsonl (SYNC) + * This is the ONLY place messages are written to log.jsonl + */ + logToFile(channel: string, entry: object): void { + const dir = join(this.workingDir, channel); + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }); + appendFileSync(join(dir, "log.jsonl"), JSON.stringify(entry) + "\n"); + } - try { - const result = await this.webClient.users.info({ user: userId }); - const user = result.user as { name?: string; real_name?: string }; - const info = { - userName: user?.name || userId, - displayName: user?.real_name || user?.name || userId, - }; - this.userCache.set(userId, info); - return info; - } catch { - return { userName: userId, displayName: userId }; + /** + * Log a bot response to log.jsonl + */ + logBotResponse(channel: string, text: string, ts: string): void { + this.logToFile(channel, { + date: new Date().toISOString(), + ts, + user: "bot", + text, + attachments: [], + isBot: true, + }); + } + + // ========================================================================== + // Private - Event Handlers + // ========================================================================== + + private getQueue(channelId: string): ChannelQueue { + let queue = this.queues.get(channelId); + if (!queue) { + queue = new ChannelQueue(); + this.queues.set(channelId, queue); } + return queue; } private setupEventHandlers(): void { - // Handle @mentions in channels - this.socketClient.on("app_mention", async ({ event, ack }) => { - await ack(); - - const slackEvent = event as { + // Channel @mentions + this.socketClient.on("app_mention", ({ event, ack }) => { + const e = event as { text: string; channel: string; user: string; @@ -219,24 +242,57 @@ export class MomBot { files?: Array<{ name: string; url_private_download?: string; url_private?: string }>; }; - // Log the mention message (message event may not fire for all channel types) - await this.logMessage({ - text: slackEvent.text, - channel: slackEvent.channel, - user: slackEvent.user, - ts: slackEvent.ts, - files: slackEvent.files, - }); + // Skip DMs (handled by message event) + if (e.channel.startsWith("D")) { + ack(); + return; + } - const ctx = await this.createContext(slackEvent); - await this.handler.onChannelMention(ctx); + const slackEvent: SlackEvent = { + type: "mention", + channel: e.channel, + ts: e.ts, + user: e.user, + text: e.text.replace(/<@[A-Z0-9]+>/gi, "").trim(), + files: e.files, + }; + + // SYNC: Log to log.jsonl (ALWAYS, even for old messages) + this.logUserMessage(slackEvent); + + // Only trigger processing for messages AFTER startup (not replayed old messages) + if (this.startupTs && e.ts < this.startupTs) { + log.logInfo( + `[${e.channel}] Logged old message (pre-startup), not triggering: ${slackEvent.text.substring(0, 30)}`, + ); + ack(); + return; + } + + // Check for stop command - execute immediately, don't queue! + if (slackEvent.text.toLowerCase().trim() === "stop") { + if (this.handler.isRunning(e.channel)) { + this.handler.handleStop(e.channel, this); // Don't await, don't queue + } else { + this.postMessage(e.channel, "_Nothing running_"); + } + ack(); + return; + } + + // SYNC: Check if busy + if (this.handler.isRunning(e.channel)) { + this.postMessage(e.channel, "_Already working. Say `@mom stop` to cancel._"); + } else { + this.getQueue(e.channel).enqueue(() => this.handler.handleEvent(slackEvent, this)); + } + + ack(); }); - // Handle all messages (for logging) and DMs (for triggering handler) - this.socketClient.on("message", async ({ event, ack }) => { - await ack(); - - const slackEvent = event as { + // All messages (for logging) + DMs (for triggering) + this.socketClient.on("message", ({ event, ack }) => { + const e = event as { text?: string; channel: string; user?: string; @@ -247,280 +303,126 @@ export class MomBot { files?: Array<{ name: string; url_private_download?: string; url_private?: string }>; }; - // Ignore bot messages - if (slackEvent.bot_id) return; - // Ignore message edits, etc. (but allow file_share) - if (slackEvent.subtype !== undefined && slackEvent.subtype !== "file_share") return; - // Ignore if no user - if (!slackEvent.user) return; - // Ignore messages from the bot itself - if (slackEvent.user === this.botUserId) return; - // Ignore if no text AND no files - if (!slackEvent.text && (!slackEvent.files || slackEvent.files.length === 0)) return; - - // Log ALL messages (channel and DM) - await this.logMessage({ - text: slackEvent.text || "", - channel: slackEvent.channel, - user: slackEvent.user, - ts: slackEvent.ts, - files: slackEvent.files, - }); - - // Only trigger handler for DMs (channel mentions are handled by app_mention event) - if (slackEvent.channel_type === "im") { - const ctx = await this.createContext({ - text: slackEvent.text || "", - channel: slackEvent.channel, - user: slackEvent.user, - ts: slackEvent.ts, - files: slackEvent.files, - }); - await this.handler.onDirectMessage(ctx); + // Skip bot messages, edits, etc. + if (e.bot_id || !e.user || e.user === this.botUserId) { + ack(); + return; } + if (e.subtype !== undefined && e.subtype !== "file_share") { + ack(); + return; + } + if (!e.text && (!e.files || e.files.length === 0)) { + ack(); + return; + } + + const isDM = e.channel_type === "im"; + const isBotMention = e.text?.includes(`<@${this.botUserId}>`); + + // Skip channel @mentions - already handled by app_mention event + if (!isDM && isBotMention) { + ack(); + return; + } + + const slackEvent: SlackEvent = { + type: isDM ? "dm" : "mention", + channel: e.channel, + ts: e.ts, + user: e.user, + text: (e.text || "").replace(/<@[A-Z0-9]+>/gi, "").trim(), + files: e.files, + }; + + // SYNC: Log to log.jsonl (ALL messages - channel chatter and DMs) + this.logUserMessage(slackEvent); + + // Only trigger processing for messages AFTER startup (not replayed old messages) + if (this.startupTs && e.ts < this.startupTs) { + log.logInfo(`[${e.channel}] Skipping old message (pre-startup): ${slackEvent.text.substring(0, 30)}`); + ack(); + return; + } + + // Only trigger handler for DMs + if (isDM) { + // Check for stop command - execute immediately, don't queue! + if (slackEvent.text.toLowerCase().trim() === "stop") { + if (this.handler.isRunning(e.channel)) { + this.handler.handleStop(e.channel, this); // Don't await, don't queue + } else { + this.postMessage(e.channel, "_Nothing running_"); + } + ack(); + return; + } + + if (this.handler.isRunning(e.channel)) { + this.postMessage(e.channel, "_Already working. Say `stop` to cancel._"); + } else { + this.getQueue(e.channel).enqueue(() => this.handler.handleEvent(slackEvent, this)); + } + } + + ack(); }); } - private async logMessage(event: { - text: string; - channel: string; - user: string; - ts: string; - files?: Array<{ name: string; url_private_download?: string; url_private?: string }>; - }): Promise { - const attachments = event.files ? this.store.processAttachments(event.channel, event.files, event.ts) : []; - const { userName, displayName } = await this.getUserInfo(event.user); - - await this.store.logMessage(event.channel, { + /** + * Log a user message to log.jsonl (SYNC) + */ + private logUserMessage(event: SlackEvent): void { + const user = this.users.get(event.user); + this.logToFile(event.channel, { date: new Date(parseFloat(event.ts) * 1000).toISOString(), ts: event.ts, user: event.user, - userName, - displayName, + userName: user?.userName, + displayName: user?.displayName, text: event.text, - attachments, + attachments: event.files?.map((f) => f.name) || [], isBot: false, }); } - private async createContext(event: { - text: string; - channel: string; - user: string; - ts: string; - files?: Array<{ name: string; url_private_download?: string; url_private?: string }>; - }): Promise { - const rawText = event.text; - const text = rawText.replace(/<@[A-Z0-9]+>/gi, "").trim(); + // ========================================================================== + // Private - Backfill + // ========================================================================== - // Get user info for logging - const { userName } = await this.getUserInfo(event.user); + private getExistingTimestamps(channelId: string): Set { + const logPath = join(this.workingDir, channelId, "log.jsonl"); + const timestamps = new Set(); + if (!existsSync(logPath)) return timestamps; - // Get channel name for logging (best effort) - let channelName: string | undefined; - try { - if (event.channel.startsWith("C")) { - const result = await this.webClient.conversations.info({ channel: event.channel }); - channelName = result.channel?.name ? `#${result.channel.name}` : undefined; - } - } catch { - // Ignore errors - we'll just use the channel ID + const content = readFileSync(logPath, "utf-8"); + const lines = content.trim().split("\n").filter(Boolean); + for (const line of lines) { + try { + const entry = JSON.parse(line); + if (entry.ts) timestamps.add(entry.ts); + } catch {} } - - // Process attachments (for context, already logged by message handler) - const attachments = event.files ? this.store.processAttachments(event.channel, event.files, event.ts) : []; - - // Track the single message for this run - let messageTs: string | null = null; - let accumulatedText = ""; - let isThinking = true; // Track if we're still in "thinking" state - let isWorking = true; // Track if still processing - const workingIndicator = " ..."; - let updatePromise: Promise = Promise.resolve(); - - return { - message: { - text, - rawText, - user: event.user, - userName, - channel: event.channel, - ts: event.ts, - attachments, - }, - channelName, - store: this.store, - channels: this.getChannels(), - users: this.getUsers(), - respond: async (responseText: string, shouldLog = true) => { - // Queue updates to avoid race conditions - updatePromise = updatePromise.then(async () => { - try { - if (isThinking) { - // First real response replaces "Thinking..." - accumulatedText = responseText; - isThinking = false; - } else { - // Subsequent responses get appended - accumulatedText += "\n" + responseText; - } - - // Truncate accumulated text if too long (Slack limit is 40K, we use 35K for safety) - const MAX_MAIN_LENGTH = 35000; - const truncationNote = "\n\n_(message truncated, ask me to elaborate on specific parts)_"; - if (accumulatedText.length > MAX_MAIN_LENGTH) { - accumulatedText = - accumulatedText.substring(0, MAX_MAIN_LENGTH - truncationNote.length) + truncationNote; - } - - // Add working indicator if still working - const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText; - - if (messageTs) { - // Update existing message - await this.webClient.chat.update({ - channel: event.channel, - ts: messageTs, - text: displayText, - }); - } else { - // Post initial message - const result = await this.webClient.chat.postMessage({ - channel: event.channel, - text: displayText, - }); - messageTs = result.ts as string; - } - - // Log the response if requested - if (shouldLog) { - await this.store.logBotResponse(event.channel, responseText, messageTs!); - } - } catch (err) { - log.logWarning("Slack respond error", err instanceof Error ? err.message : String(err)); - } - }); - - await updatePromise; - }, - respondInThread: async (threadText: string) => { - // Queue thread posts to maintain order - updatePromise = updatePromise.then(async () => { - try { - if (!messageTs) { - // No main message yet, just skip - return; - } - // Obfuscate usernames to avoid pinging people in thread details - let obfuscatedText = this.obfuscateUsernames(threadText); - - // Truncate thread messages if too long (20K limit for safety) - const MAX_THREAD_LENGTH = 20000; - if (obfuscatedText.length > MAX_THREAD_LENGTH) { - obfuscatedText = obfuscatedText.substring(0, MAX_THREAD_LENGTH - 50) + "\n\n_(truncated)_"; - } - - // Post in thread under the main message - await this.webClient.chat.postMessage({ - channel: event.channel, - thread_ts: messageTs, - text: obfuscatedText, - }); - } catch (err) { - log.logWarning("Slack respondInThread error", err instanceof Error ? err.message : String(err)); - } - }); - await updatePromise; - }, - setTyping: async (isTyping: boolean) => { - if (isTyping && !messageTs) { - // Post initial "thinking" message (... auto-appended by working indicator) - accumulatedText = "_Thinking_"; - const result = await this.webClient.chat.postMessage({ - channel: event.channel, - text: accumulatedText, - }); - messageTs = result.ts as string; - } - // We don't delete/clear anymore - message persists and gets updated - }, - uploadFile: async (filePath: string, title?: string) => { - const fileName = title || basename(filePath); - const fileContent = readFileSync(filePath); - - await this.webClient.files.uploadV2({ - channel_id: event.channel, - file: fileContent, - filename: fileName, - title: fileName, - }); - }, - replaceMessage: async (text: string) => { - updatePromise = updatePromise.then(async () => { - try { - // Replace the accumulated text entirely, with truncation - const MAX_MAIN_LENGTH = 35000; - const truncationNote = "\n\n_(message truncated, ask me to elaborate on specific parts)_"; - if (text.length > MAX_MAIN_LENGTH) { - accumulatedText = text.substring(0, MAX_MAIN_LENGTH - truncationNote.length) + truncationNote; - } else { - accumulatedText = text; - } - - const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText; - - if (messageTs) { - await this.webClient.chat.update({ - channel: event.channel, - ts: messageTs, - text: displayText, - }); - } else { - // Post initial message - const result = await this.webClient.chat.postMessage({ - channel: event.channel, - text: displayText, - }); - messageTs = result.ts as string; - } - } catch (err) { - log.logWarning("Slack replaceMessage error", err instanceof Error ? err.message : String(err)); - } - }); - await updatePromise; - }, - setWorking: async (working: boolean) => { - updatePromise = updatePromise.then(async () => { - try { - isWorking = working; - - // If we have a message, update it to add/remove indicator - if (messageTs) { - const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText; - await this.webClient.chat.update({ - channel: event.channel, - ts: messageTs, - text: displayText, - }); - } - } catch (err) { - log.logWarning("Slack setWorking error", err instanceof Error ? err.message : String(err)); - } - }); - await updatePromise; - }, - }; + return timestamps; } - /** - * Backfill missed messages for a single channel - * Returns the number of messages backfilled - */ private async backfillChannel(channelId: string): Promise { - const lastTs = this.store.getLastTimestamp(channelId); + const existingTs = this.getExistingTimestamps(channelId); - // Collect messages from up to 3 pages - type Message = NonNullable[number]; + // Find the biggest ts in log.jsonl + let latestTs: string | undefined; + for (const ts of existingTs) { + if (!latestTs || parseFloat(ts) > parseFloat(latestTs)) latestTs = ts; + } + + type Message = { + user?: string; + bot_id?: string; + text?: string; + ts?: string; + subtype?: string; + files?: Array<{ name: string }>; + }; const allMessages: Message[] = []; let cursor: string | undefined; @@ -530,88 +432,76 @@ export class MomBot { do { const result = await this.webClient.conversations.history({ channel: channelId, - oldest: lastTs ?? undefined, + oldest: latestTs, // Only fetch messages newer than what we have inclusive: false, limit: 1000, cursor, }); - if (result.messages) { - allMessages.push(...result.messages); + allMessages.push(...(result.messages as Message[])); } - cursor = result.response_metadata?.next_cursor; pageCount++; } while (cursor && pageCount < maxPages); - // Filter messages: include mom's messages, exclude other bots + // Filter: include mom's messages, exclude other bots, skip already logged const relevantMessages = allMessages.filter((msg) => { - // Always include mom's own messages + if (!msg.ts || existingTs.has(msg.ts)) return false; // Skip duplicates if (msg.user === this.botUserId) return true; - // Exclude other bot messages if (msg.bot_id) return false; - // Standard filters for user messages if (msg.subtype !== undefined && msg.subtype !== "file_share") return false; if (!msg.user) return false; if (!msg.text && (!msg.files || msg.files.length === 0)) return false; return true; }); - // Reverse to chronological order (API returns newest first) + // Reverse to chronological order relevantMessages.reverse(); - // Log each message + // Log each message to log.jsonl for (const msg of relevantMessages) { const isMomMessage = msg.user === this.botUserId; - const attachments = msg.files ? this.store.processAttachments(channelId, msg.files, msg.ts!) : []; + const user = this.users.get(msg.user!); + // Strip @mentions from text (same as live messages) + const text = (msg.text || "").replace(/<@[A-Z0-9]+>/gi, "").trim(); - if (isMomMessage) { - // Log mom's message as bot response - await this.store.logMessage(channelId, { - date: new Date(parseFloat(msg.ts!) * 1000).toISOString(), - ts: msg.ts!, - user: "bot", - text: msg.text || "", - attachments, - isBot: true, - }); - } else { - // Log user message - const { userName, displayName } = await this.getUserInfo(msg.user!); - await this.store.logMessage(channelId, { - date: new Date(parseFloat(msg.ts!) * 1000).toISOString(), - ts: msg.ts!, - user: msg.user!, - userName, - displayName, - text: msg.text || "", - attachments, - isBot: false, - }); - } + this.logToFile(channelId, { + date: new Date(parseFloat(msg.ts!) * 1000).toISOString(), + ts: msg.ts!, + user: isMomMessage ? "bot" : msg.user!, + userName: isMomMessage ? undefined : user?.userName, + displayName: isMomMessage ? undefined : user?.displayName, + text, + attachments: msg.files?.map((f) => f.name) || [], + isBot: isMomMessage, + }); } return relevantMessages.length; } - /** - * Backfill missed messages for all channels - */ private async backfillAllChannels(): Promise { const startTime = Date.now(); - log.logBackfillStart(this.channelCache.size); + + // Only backfill channels that already have a log.jsonl (mom has interacted with them before) + const channelsToBackfill: Array<[string, SlackChannel]> = []; + for (const [channelId, channel] of this.channels) { + const logPath = join(this.workingDir, channelId, "log.jsonl"); + if (existsSync(logPath)) { + channelsToBackfill.push([channelId, channel]); + } + } + + log.logBackfillStart(channelsToBackfill.length); let totalMessages = 0; - - for (const [channelId, channelName] of this.channelCache) { + for (const [channelId, channel] of channelsToBackfill) { try { const count = await this.backfillChannel(channelId); - if (count > 0) { - log.logBackfillChannel(channelName, count); - } + if (count > 0) log.logBackfillChannel(channel.name, count); totalMessages += count; } catch (error) { - log.logWarning(`Failed to backfill channel #${channelName}`, String(error)); + log.logWarning(`Failed to backfill #${channel.name}`, String(error)); } } @@ -619,23 +509,69 @@ export class MomBot { log.logBackfillComplete(totalMessages, durationMs); } - async start(): Promise { - const auth = await this.webClient.auth.test(); - this.botUserId = auth.user_id as string; + // ========================================================================== + // Private - Fetch Users/Channels + // ========================================================================== - // Fetch channels and users in parallel - await Promise.all([this.fetchChannels(), this.fetchUsers()]); - log.logInfo(`Loaded ${this.channelCache.size} channels, ${this.userCache.size} users`); - - // Backfill any messages missed while offline - await this.backfillAllChannels(); - - await this.socketClient.start(); - log.logConnected(); + private async fetchUsers(): Promise { + let cursor: string | undefined; + do { + const result = await this.webClient.users.list({ limit: 200, cursor }); + const members = result.members as + | Array<{ id?: string; name?: string; real_name?: string; deleted?: boolean }> + | undefined; + if (members) { + for (const u of members) { + if (u.id && u.name && !u.deleted) { + this.users.set(u.id, { id: u.id, userName: u.name, displayName: u.real_name || u.name }); + } + } + } + cursor = result.response_metadata?.next_cursor; + } while (cursor); } - async stop(): Promise { - await this.socketClient.disconnect(); - log.logDisconnected(); + private async fetchChannels(): Promise { + // Fetch public/private channels + let cursor: string | undefined; + do { + const result = await this.webClient.conversations.list({ + types: "public_channel,private_channel", + exclude_archived: true, + limit: 200, + cursor, + }); + const channels = result.channels as Array<{ id?: string; name?: string; is_member?: boolean }> | undefined; + if (channels) { + for (const c of channels) { + if (c.id && c.name && c.is_member) { + this.channels.set(c.id, { id: c.id, name: c.name }); + } + } + } + cursor = result.response_metadata?.next_cursor; + } while (cursor); + + // Also fetch DM channels (IMs) + cursor = undefined; + do { + const result = await this.webClient.conversations.list({ + types: "im", + limit: 200, + cursor, + }); + const ims = result.channels as Array<{ id?: string; user?: string }> | undefined; + if (ims) { + for (const im of ims) { + if (im.id) { + // Use user's name as channel name for DMs + const user = im.user ? this.users.get(im.user) : undefined; + const name = user ? `DM:${user.userName}` : `DM:${im.id}`; + this.channels.set(im.id, { id: im.id, name }); + } + } + } + cursor = result.response_metadata?.next_cursor; + } while (cursor); } }