co-mono/packages/mom/src/main.ts
Mario Zechner f1451fd8f0 mom: fix [SILENT] to delete thread messages too
Track all thread message timestamps during a run. When response is
[SILENT], delete all thread messages before deleting the main message.
This prevents the 'This message was deleted' tombstone in Slack.
2025-12-12 22:56:56 +01:00

334 lines
10 KiB
JavaScript

#!/usr/bin/env node
import { join, resolve } from "path";
import { type AgentRunner, getOrCreateRunner } from "./agent.js";
import { syncLogToContext } from "./context.js";
import { downloadChannel } from "./download.js";
import { createEventsWatcher } from "./events.js";
import * as log from "./log.js";
import { parseSandboxArg, type SandboxConfig, validateSandbox } from "./sandbox.js";
import { type MomHandler, type SlackBot, SlackBot as SlackBotClass, type SlackEvent } from "./slack.js";
import { ChannelStore } from "./store.js";
// ============================================================================
// Config
// ============================================================================
const MOM_SLACK_APP_TOKEN = process.env.MOM_SLACK_APP_TOKEN;
const MOM_SLACK_BOT_TOKEN = process.env.MOM_SLACK_BOT_TOKEN;
const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY;
const ANTHROPIC_OAUTH_TOKEN = process.env.ANTHROPIC_OAUTH_TOKEN;
interface ParsedArgs {
workingDir?: string;
sandbox: SandboxConfig;
downloadChannel?: string;
}
function parseArgs(): ParsedArgs {
const args = process.argv.slice(2);
let sandbox: SandboxConfig = { type: "host" };
let workingDir: string | undefined;
let downloadChannelId: string | undefined;
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (arg.startsWith("--sandbox=")) {
sandbox = parseSandboxArg(arg.slice("--sandbox=".length));
} else if (arg === "--sandbox") {
sandbox = parseSandboxArg(args[++i] || "");
} else if (arg.startsWith("--download=")) {
downloadChannelId = arg.slice("--download=".length);
} else if (arg === "--download") {
downloadChannelId = args[++i];
} else if (!arg.startsWith("-")) {
workingDir = arg;
}
}
return {
workingDir: workingDir ? resolve(workingDir) : undefined,
sandbox,
downloadChannel: downloadChannelId,
};
}
const parsedArgs = parseArgs();
// Handle --download mode
if (parsedArgs.downloadChannel) {
if (!MOM_SLACK_BOT_TOKEN) {
console.error("Missing env: MOM_SLACK_BOT_TOKEN");
process.exit(1);
}
await downloadChannel(parsedArgs.downloadChannel, MOM_SLACK_BOT_TOKEN);
process.exit(0);
}
// Normal bot mode - require working dir
if (!parsedArgs.workingDir) {
console.error("Usage: mom [--sandbox=host|docker:<name>] <working-directory>");
console.error(" mom --download <channel-id>");
process.exit(1);
}
const { workingDir, sandbox } = { workingDir: parsedArgs.workingDir, sandbox: parsedArgs.sandbox };
if (!MOM_SLACK_APP_TOKEN || !MOM_SLACK_BOT_TOKEN || (!ANTHROPIC_API_KEY && !ANTHROPIC_OAUTH_TOKEN)) {
console.error("Missing env: MOM_SLACK_APP_TOKEN, MOM_SLACK_BOT_TOKEN, ANTHROPIC_API_KEY or ANTHROPIC_OAUTH_TOKEN");
process.exit(1);
}
await validateSandbox(sandbox);
// ============================================================================
// State (per channel)
// ============================================================================
interface ChannelState {
running: boolean;
runner: AgentRunner;
store: ChannelStore;
stopRequested: boolean;
stopMessageTs?: string;
}
const channelStates = new Map<string, ChannelState>();
function getState(channelId: string): ChannelState {
let state = channelStates.get(channelId);
if (!state) {
const channelDir = join(workingDir, channelId);
state = {
running: false,
runner: getOrCreateRunner(sandbox, channelId, channelDir),
store: new ChannelStore({ workingDir, botToken: MOM_SLACK_BOT_TOKEN! }),
stopRequested: false,
};
channelStates.set(channelId, state);
}
return state;
}
// ============================================================================
// Create SlackContext adapter
// ============================================================================
function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelState, isEvent?: boolean) {
let messageTs: string | null = null;
const threadMessageTs: string[] = [];
let accumulatedText = "";
let isWorking = true;
const workingIndicator = " ...";
let updatePromise = Promise.resolve();
const user = slack.getUser(event.user);
// Extract event filename for status message
const eventFilename = isEvent ? event.text.match(/^\[EVENT:([^:]+):/)?.[1] : undefined;
return {
message: {
text: event.text,
rawText: event.text,
user: event.user,
userName: user?.userName,
channel: event.channel,
ts: event.ts,
attachments: (event.attachments || []).map((a) => ({ local: a.local })),
},
channelName: slack.getChannel(event.channel)?.name,
store: state.store,
channels: slack.getAllChannels().map((c) => ({ id: c.id, name: c.name })),
users: slack.getAllUsers().map((u) => ({ id: u.id, userName: u.userName, displayName: u.displayName })),
respond: async (text: string, shouldLog = true) => {
updatePromise = updatePromise.then(async () => {
accumulatedText = accumulatedText ? accumulatedText + "\n" + text : text;
const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText;
if (messageTs) {
await slack.updateMessage(event.channel, messageTs, displayText);
} else {
messageTs = await slack.postMessage(event.channel, displayText);
}
if (shouldLog && messageTs) {
slack.logBotResponse(event.channel, text, messageTs);
}
});
await updatePromise;
},
replaceMessage: async (text: string) => {
updatePromise = updatePromise.then(async () => {
accumulatedText = text;
const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText;
if (messageTs) {
await slack.updateMessage(event.channel, messageTs, displayText);
} else {
messageTs = await slack.postMessage(event.channel, displayText);
}
});
await updatePromise;
},
respondInThread: async (text: string) => {
updatePromise = updatePromise.then(async () => {
if (messageTs) {
const ts = await slack.postInThread(event.channel, messageTs, text);
threadMessageTs.push(ts);
}
});
await updatePromise;
},
setTyping: async (isTyping: boolean) => {
if (isTyping && !messageTs) {
updatePromise = updatePromise.then(async () => {
if (!messageTs) {
accumulatedText = eventFilename ? `_Starting event: ${eventFilename}_` : "_Thinking_";
messageTs = await slack.postMessage(event.channel, accumulatedText + workingIndicator);
}
});
await updatePromise;
}
},
uploadFile: async (filePath: string, title?: string) => {
await slack.uploadFile(event.channel, filePath, title);
},
setWorking: async (working: boolean) => {
updatePromise = updatePromise.then(async () => {
isWorking = working;
if (messageTs) {
const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText;
await slack.updateMessage(event.channel, messageTs, displayText);
}
});
await updatePromise;
},
deleteMessage: async () => {
updatePromise = updatePromise.then(async () => {
// Delete thread messages first (in reverse order)
for (let i = threadMessageTs.length - 1; i >= 0; i--) {
try {
await slack.deleteMessage(event.channel, threadMessageTs[i]);
} catch {
// Ignore errors deleting thread messages
}
}
threadMessageTs.length = 0;
// Then delete main message
if (messageTs) {
await slack.deleteMessage(event.channel, messageTs);
messageTs = null;
}
});
await updatePromise;
},
};
}
// ============================================================================
// Handler
// ============================================================================
const handler: MomHandler = {
isRunning(channelId: string): boolean {
const state = channelStates.get(channelId);
return state?.running ?? false;
},
async handleStop(channelId: string, slack: SlackBot): Promise<void> {
const state = channelStates.get(channelId);
if (state?.running) {
state.stopRequested = true;
state.runner.abort();
const ts = await slack.postMessage(channelId, "_Stopping..._");
state.stopMessageTs = ts; // Save for updating later
} else {
await slack.postMessage(channelId, "_Nothing running_");
}
},
async handleEvent(event: SlackEvent, slack: SlackBot, isEvent?: boolean): Promise<void> {
const state = getState(event.channel);
const channelDir = join(workingDir, event.channel);
// Start run
state.running = true;
state.stopRequested = false;
log.logInfo(`[${event.channel}] Starting run: ${event.text.substring(0, 50)}`);
try {
// SYNC context from log.jsonl BEFORE processing
// This adds any messages that were logged while mom wasn't running
// Exclude messages >= current ts (will be handled by agent)
const syncedCount = syncLogToContext(channelDir, event.ts);
if (syncedCount > 0) {
log.logInfo(`[${event.channel}] Synced ${syncedCount} messages from log to context`);
}
// Create context adapter
const ctx = createSlackContext(event, slack, state, isEvent);
// Run the agent
await ctx.setTyping(true);
await ctx.setWorking(true);
const result = await state.runner.run(ctx as any, state.store);
await ctx.setWorking(false);
if (result.stopReason === "aborted" && state.stopRequested) {
if (state.stopMessageTs) {
await slack.updateMessage(event.channel, state.stopMessageTs, "_Stopped_");
state.stopMessageTs = undefined;
} else {
await slack.postMessage(event.channel, "_Stopped_");
}
}
} catch (err) {
log.logWarning(`[${event.channel}] Run error`, err instanceof Error ? err.message : String(err));
} finally {
state.running = false;
}
},
};
// ============================================================================
// Start
// ============================================================================
log.logStartup(workingDir, sandbox.type === "host" ? "host" : `docker:${sandbox.container}`);
// Shared store for attachment downloads (also used per-channel in getState)
const sharedStore = new ChannelStore({ workingDir, botToken: MOM_SLACK_BOT_TOKEN! });
const bot = new SlackBotClass(handler, {
appToken: MOM_SLACK_APP_TOKEN,
botToken: MOM_SLACK_BOT_TOKEN,
workingDir,
store: sharedStore,
});
// Start events watcher
const eventsWatcher = createEventsWatcher(workingDir, bot);
eventsWatcher.start();
// Handle shutdown
process.on("SIGINT", () => {
log.logInfo("Shutting down...");
eventsWatcher.stop();
process.exit(0);
});
process.on("SIGTERM", () => {
log.logInfo("Shutting down...");
eventsWatcher.stop();
process.exit(0);
});
bot.start();