diff --git a/packages/mom/CHANGELOG.md b/packages/mom/CHANGELOG.md index 509f88c5..b61b97b1 100644 --- a/packages/mom/CHANGELOG.md +++ b/packages/mom/CHANGELOG.md @@ -2,6 +2,15 @@ ## [Unreleased] +### Added + +- Message backfill on startup (#103) + - Fetches missed messages from Slack using `conversations.history` API when mom restarts + - Backfills up to 3 pages (3000 messages) per channel since last logged timestamp + - Includes mom's own responses and user messages (excludes other bots) + - Downloads attachments from backfilled messages + - Logs progress: channel count, per-channel message counts, total with duration + ## [0.10.2] - 2025-11-27 ### Breaking Changes diff --git a/packages/mom/src/log.ts b/packages/mom/src/log.ts index e8d3ed86..ee1ac983 100644 --- a/packages/mom/src/log.ts +++ b/packages/mom/src/log.ts @@ -242,3 +242,17 @@ export function logConnected(): void { export function logDisconnected(): void { console.log("Mom bot disconnected."); } + +// Backfill +export function logBackfillStart(channelCount: number): void { + console.log(chalk.blue(`${timestamp()} [system] Backfilling ${channelCount} channels...`)); +} + +export function logBackfillChannel(channelName: string, messageCount: number): void { + console.log(chalk.blue(`${timestamp()} [system] #${channelName}: ${messageCount} messages`)); +} + +export function logBackfillComplete(totalMessages: number, durationMs: number): void { + const duration = (durationMs / 1000).toFixed(1); + console.log(chalk.blue(`${timestamp()} [system] Backfill complete: ${totalMessages} messages in ${duration}s`)); +} diff --git a/packages/mom/src/slack.ts b/packages/mom/src/slack.ts index c5766722..1065ac59 100644 --- a/packages/mom/src/slack.ts +++ b/packages/mom/src/slack.ts @@ -1,5 +1,5 @@ import { SocketModeClient } from "@slack/socket-mode"; -import { WebClient } from "@slack/web-api"; +import { type ConversationsHistoryResponse, WebClient } from "@slack/web-api"; import { readFileSync } from "fs"; import { basename } from "path"; import * as log from "./log.js"; @@ -467,6 +467,113 @@ export class MomBot { }; } + /** + * Backfill missed messages for a single channel + * Returns the number of messages backfilled + */ + private async backfillChannel(channelId: string): Promise { + const lastTs = this.store.getLastTimestamp(channelId); + + // Collect messages from up to 3 pages + type Message = NonNullable[number]; + const allMessages: Message[] = []; + + let cursor: string | undefined; + let pageCount = 0; + const maxPages = 3; + + do { + const result = await this.webClient.conversations.history({ + channel: channelId, + oldest: lastTs ?? undefined, + inclusive: false, + limit: 1000, + cursor, + }); + + if (result.messages) { + allMessages.push(...result.messages); + } + + cursor = result.response_metadata?.next_cursor; + pageCount++; + } while (cursor && pageCount < maxPages); + + // Filter messages: include mom's messages, exclude other bots + const relevantMessages = allMessages.filter((msg) => { + // Always include mom's own messages + if (msg.user === this.botUserId) return true; + // Exclude other bot messages + if (msg.bot_id) return false; + // Standard filters for user messages + if (msg.subtype !== undefined && msg.subtype !== "file_share") return false; + if (!msg.user) return false; + if (!msg.text && (!msg.files || msg.files.length === 0)) return false; + return true; + }); + + // Reverse to chronological order (API returns newest first) + relevantMessages.reverse(); + + // Log each message + for (const msg of relevantMessages) { + const isMomMessage = msg.user === this.botUserId; + const attachments = msg.files ? this.store.processAttachments(channelId, msg.files, msg.ts!) : []; + + if (isMomMessage) { + // Log mom's message as bot response + await this.store.logMessage(channelId, { + date: new Date(parseFloat(msg.ts!) * 1000).toISOString(), + ts: msg.ts!, + user: "bot", + text: msg.text || "", + attachments, + isBot: true, + }); + } else { + // Log user message + const { userName, displayName } = await this.getUserInfo(msg.user!); + await this.store.logMessage(channelId, { + date: new Date(parseFloat(msg.ts!) * 1000).toISOString(), + ts: msg.ts!, + user: msg.user!, + userName, + displayName, + text: msg.text || "", + attachments, + isBot: false, + }); + } + } + + return relevantMessages.length; + } + + /** + * Backfill missed messages for all channels + */ + private async backfillAllChannels(): Promise { + const startTime = Date.now(); + log.logBackfillStart(this.channelCache.size); + + let totalMessages = 0; + + for (const [channelId, channelName] of this.channelCache) { + try { + const count = await this.backfillChannel(channelId); + if (count > 0) { + log.logBackfillChannel(channelName, count); + } + totalMessages += count; + } catch (error) { + log.logWarning(`Failed to backfill channel #${channelName}`, String(error)); + } + } + + const durationMs = Date.now() - startTime; + log.logBackfillComplete(totalMessages, durationMs); + } + async start(): Promise { const auth = await this.webClient.auth.test(); this.botUserId = auth.user_id as string; @@ -475,6 +582,9 @@ export class MomBot { await Promise.all([this.fetchChannels(), this.fetchUsers()]); log.logInfo(`Loaded ${this.channelCache.size} channels, ${this.userCache.size} users`); + // Backfill any messages missed while offline + await this.backfillAllChannels(); + await this.socketClient.start(); log.logConnected(); } diff --git a/packages/mom/src/store.ts b/packages/mom/src/store.ts index 8ac02aa5..d319cee4 100644 --- a/packages/mom/src/store.ts +++ b/packages/mom/src/store.ts @@ -1,4 +1,4 @@ -import { existsSync, mkdirSync } from "fs"; +import { existsSync, mkdirSync, readFileSync } from "fs"; import { appendFile, writeFile } from "fs/promises"; import { join } from "path"; import * as log from "./log.js"; @@ -74,7 +74,7 @@ export class ChannelStore { */ processAttachments( channelId: string, - files: Array<{ name: string; url_private_download?: string; url_private?: string }>, + files: Array<{ name?: string; url_private_download?: string; url_private?: string }>, timestamp: string, ): Attachment[] { const attachments: Attachment[] = []; @@ -82,6 +82,10 @@ export class ChannelStore { for (const file of files) { const url = file.url_private_download || file.url_private; if (!url) continue; + if (!file.name) { + log.logWarning("Attachment missing name, skipping", url); + continue; + } const filename = this.generateLocalFilename(file.name, timestamp); const localPath = `${channelId}/attachments/${filename}`; @@ -139,6 +143,30 @@ export class ChannelStore { }); } + /** + * Get the timestamp of the last logged message for a channel + * Returns null if no log exists + */ + getLastTimestamp(channelId: string): string | null { + const logPath = join(this.workingDir, channelId, "log.jsonl"); + if (!existsSync(logPath)) { + return null; + } + + try { + const content = readFileSync(logPath, "utf-8"); + const lines = content.trim().split("\n"); + if (lines.length === 0 || lines[0] === "") { + return null; + } + const lastLine = lines[lines.length - 1]; + const message = JSON.parse(lastLine) as LoggedMessage; + return message.ts; + } catch { + return null; + } + } + /** * Process the download queue in the background */