mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-17 08:00:59 +00:00
mom: rewrite message handling - log.jsonl and context.jsonl sync
- log.jsonl is source of truth, context.jsonl syncs from it at run start - Backfill fetches missing messages from Slack API on startup - Messages sent while mom is busy are logged and synced on next run - Channel chatter (no @mention) logged but doesn't trigger processing - Pre-startup messages (replayed by Slack) logged but not processed - Stop command executes immediately, not queued - Session header written immediately on new session creation - Deduplicate messages by timestamp - Strip @mentions from backfilled messages - Remove old slack.ts and main.ts, rename *-new.ts versions
This commit is contained in:
parent
e513127b3b
commit
99fe4802ef
5 changed files with 1142 additions and 969 deletions
File diff suppressed because one or more lines are too long
|
|
@ -2,7 +2,7 @@ import { Agent, type AgentEvent, ProviderTransport } from "@mariozechner/pi-agen
|
||||||
import { getModel } from "@mariozechner/pi-ai";
|
import { getModel } from "@mariozechner/pi-ai";
|
||||||
import { AgentSession, messageTransformer } from "@mariozechner/pi-coding-agent";
|
import { AgentSession, messageTransformer } from "@mariozechner/pi-coding-agent";
|
||||||
import { existsSync, readFileSync } from "fs";
|
import { existsSync, readFileSync } from "fs";
|
||||||
import { mkdir } from "fs/promises";
|
import { mkdir, writeFile } from "fs/promises";
|
||||||
import { join } from "path";
|
import { join } from "path";
|
||||||
import { MomSessionManager, MomSettingsManager } from "./context.js";
|
import { MomSessionManager, MomSettingsManager } from "./context.js";
|
||||||
import * as log from "./log.js";
|
import * as log from "./log.js";
|
||||||
|
|
@ -34,8 +34,15 @@ function toSlackTs(): string {
|
||||||
return `${seconds}.${micros.toString().padStart(6, "0")}`;
|
return `${seconds}.${micros.toString().padStart(6, "0")}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface PendingMessage {
|
||||||
|
userName: string;
|
||||||
|
text: string;
|
||||||
|
attachments: { local: string }[];
|
||||||
|
timestamp: number;
|
||||||
|
}
|
||||||
|
|
||||||
export interface AgentRunner {
|
export interface AgentRunner {
|
||||||
run(ctx: SlackContext, channelDir: string, store: ChannelStore): Promise<{ stopReason: string }>;
|
run(ctx: SlackContext, store: ChannelStore, pendingMessages?: PendingMessage[]): Promise<{ stopReason: string }>;
|
||||||
abort(): void;
|
abort(): void;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -254,21 +261,250 @@ function formatToolArgsForSlack(_toolName: string, args: Record<string, unknown>
|
||||||
return lines.join("\n");
|
return lines.join("\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cache for AgentSession and SessionManager per channel
|
// Cache runners per channel
|
||||||
const channelSessions = new Map<string, { session: AgentSession; sessionManager: MomSessionManager }>();
|
const channelRunners = new Map<string, AgentRunner>();
|
||||||
|
|
||||||
export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
|
/**
|
||||||
let currentSession: AgentSession | null = null;
|
* Get or create an AgentRunner for a channel.
|
||||||
|
* Runners are cached - one per channel, persistent across messages.
|
||||||
|
*/
|
||||||
|
export function getOrCreateRunner(sandboxConfig: SandboxConfig, channelId: string, channelDir: string): AgentRunner {
|
||||||
|
const existing = channelRunners.get(channelId);
|
||||||
|
if (existing) return existing;
|
||||||
|
|
||||||
|
const runner = createRunner(sandboxConfig, channelId, channelDir);
|
||||||
|
channelRunners.set(channelId, runner);
|
||||||
|
return runner;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new AgentRunner for a channel.
|
||||||
|
* Sets up the session and subscribes to events once.
|
||||||
|
*/
|
||||||
|
function createRunner(sandboxConfig: SandboxConfig, channelId: string, channelDir: string): AgentRunner {
|
||||||
const executor = createExecutor(sandboxConfig);
|
const executor = createExecutor(sandboxConfig);
|
||||||
|
const workspacePath = executor.getWorkspacePath(channelDir.replace(`/${channelId}`, ""));
|
||||||
|
|
||||||
|
// Create tools
|
||||||
|
const tools = createMomTools(executor);
|
||||||
|
|
||||||
|
// Initial system prompt (will be updated each run with fresh memory/channels/users)
|
||||||
|
const memory = getMemory(channelDir);
|
||||||
|
const systemPrompt = buildSystemPrompt(workspacePath, channelId, memory, sandboxConfig, [], []);
|
||||||
|
|
||||||
|
// Create session manager and settings manager
|
||||||
|
// Pass model info so new sessions get a header written immediately
|
||||||
|
const sessionManager = new MomSessionManager(channelDir, {
|
||||||
|
provider: model.provider,
|
||||||
|
id: model.id,
|
||||||
|
thinkingLevel: "off",
|
||||||
|
});
|
||||||
|
const settingsManager = new MomSettingsManager(join(channelDir, ".."));
|
||||||
|
|
||||||
|
// Create agent
|
||||||
|
const agent = new Agent({
|
||||||
|
initialState: {
|
||||||
|
systemPrompt,
|
||||||
|
model,
|
||||||
|
thinkingLevel: "off",
|
||||||
|
tools,
|
||||||
|
},
|
||||||
|
messageTransformer,
|
||||||
|
transport: new ProviderTransport({
|
||||||
|
getApiKey: async () => getAnthropicApiKey(),
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Load existing messages
|
||||||
|
const loadedSession = sessionManager.loadSession();
|
||||||
|
if (loadedSession.messages.length > 0) {
|
||||||
|
agent.replaceMessages(loadedSession.messages);
|
||||||
|
log.logInfo(`[${channelId}] Loaded ${loadedSession.messages.length} messages from context.jsonl`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create AgentSession wrapper
|
||||||
|
const session = new AgentSession({
|
||||||
|
agent,
|
||||||
|
sessionManager: sessionManager as any,
|
||||||
|
settingsManager: settingsManager as any,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Mutable per-run state - event handler references this
|
||||||
|
const runState = {
|
||||||
|
ctx: null as SlackContext | null,
|
||||||
|
logCtx: null as { channelId: string; userName?: string; channelName?: string } | null,
|
||||||
|
queue: null as {
|
||||||
|
enqueue(fn: () => Promise<void>, errorContext: string): void;
|
||||||
|
enqueueMessage(text: string, target: "main" | "thread", errorContext: string, doLog?: boolean): void;
|
||||||
|
} | null,
|
||||||
|
pendingTools: new Map<string, { toolName: string; args: unknown; startTime: number }>(),
|
||||||
|
totalUsage: {
|
||||||
|
input: 0,
|
||||||
|
output: 0,
|
||||||
|
cacheRead: 0,
|
||||||
|
cacheWrite: 0,
|
||||||
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||||
|
},
|
||||||
|
stopReason: "stop",
|
||||||
|
};
|
||||||
|
|
||||||
|
// Subscribe to events ONCE
|
||||||
|
session.subscribe(async (event) => {
|
||||||
|
// Skip if no active run
|
||||||
|
if (!runState.ctx || !runState.logCtx || !runState.queue) return;
|
||||||
|
|
||||||
|
const { ctx, logCtx, queue, pendingTools } = runState;
|
||||||
|
|
||||||
|
if (event.type === "tool_execution_start") {
|
||||||
|
const agentEvent = event as AgentEvent & { type: "tool_execution_start" };
|
||||||
|
const args = agentEvent.args as { label?: string };
|
||||||
|
const label = args.label || agentEvent.toolName;
|
||||||
|
|
||||||
|
pendingTools.set(agentEvent.toolCallId, {
|
||||||
|
toolName: agentEvent.toolName,
|
||||||
|
args: agentEvent.args,
|
||||||
|
startTime: Date.now(),
|
||||||
|
});
|
||||||
|
|
||||||
|
log.logToolStart(logCtx, agentEvent.toolName, label, agentEvent.args as Record<string, unknown>);
|
||||||
|
queue.enqueue(() => ctx.respond(`_→ ${label}_`, false), "tool label");
|
||||||
|
} else if (event.type === "tool_execution_end") {
|
||||||
|
const agentEvent = event as AgentEvent & { type: "tool_execution_end" };
|
||||||
|
const resultStr = extractToolResultText(agentEvent.result);
|
||||||
|
const pending = pendingTools.get(agentEvent.toolCallId);
|
||||||
|
pendingTools.delete(agentEvent.toolCallId);
|
||||||
|
|
||||||
|
const durationMs = pending ? Date.now() - pending.startTime : 0;
|
||||||
|
|
||||||
|
if (agentEvent.isError) {
|
||||||
|
log.logToolError(logCtx, agentEvent.toolName, durationMs, resultStr);
|
||||||
|
} else {
|
||||||
|
log.logToolSuccess(logCtx, agentEvent.toolName, durationMs, resultStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Post args + result to thread
|
||||||
|
const label = pending?.args ? (pending.args as { label?: string }).label : undefined;
|
||||||
|
const argsFormatted = pending
|
||||||
|
? formatToolArgsForSlack(agentEvent.toolName, pending.args as Record<string, unknown>)
|
||||||
|
: "(args not found)";
|
||||||
|
const duration = (durationMs / 1000).toFixed(1);
|
||||||
|
let threadMessage = `*${agentEvent.isError ? "✗" : "✓"} ${agentEvent.toolName}*`;
|
||||||
|
if (label) threadMessage += `: ${label}`;
|
||||||
|
threadMessage += ` (${duration}s)\n`;
|
||||||
|
if (argsFormatted) threadMessage += "```\n" + argsFormatted + "\n```\n";
|
||||||
|
threadMessage += "*Result:*\n```\n" + resultStr + "\n```";
|
||||||
|
|
||||||
|
queue.enqueueMessage(threadMessage, "thread", "tool result thread", false);
|
||||||
|
|
||||||
|
if (agentEvent.isError) {
|
||||||
|
queue.enqueue(() => ctx.respond(`_Error: ${truncate(resultStr, 200)}_`, false), "tool error");
|
||||||
|
}
|
||||||
|
} else if (event.type === "message_start") {
|
||||||
|
const agentEvent = event as AgentEvent & { type: "message_start" };
|
||||||
|
if (agentEvent.message.role === "assistant") {
|
||||||
|
log.logResponseStart(logCtx);
|
||||||
|
}
|
||||||
|
} else if (event.type === "message_end") {
|
||||||
|
const agentEvent = event as AgentEvent & { type: "message_end" };
|
||||||
|
if (agentEvent.message.role === "assistant") {
|
||||||
|
const assistantMsg = agentEvent.message as any;
|
||||||
|
|
||||||
|
if (assistantMsg.stopReason) {
|
||||||
|
runState.stopReason = assistantMsg.stopReason;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (assistantMsg.usage) {
|
||||||
|
runState.totalUsage.input += assistantMsg.usage.input;
|
||||||
|
runState.totalUsage.output += assistantMsg.usage.output;
|
||||||
|
runState.totalUsage.cacheRead += assistantMsg.usage.cacheRead;
|
||||||
|
runState.totalUsage.cacheWrite += assistantMsg.usage.cacheWrite;
|
||||||
|
runState.totalUsage.cost.input += assistantMsg.usage.cost.input;
|
||||||
|
runState.totalUsage.cost.output += assistantMsg.usage.cost.output;
|
||||||
|
runState.totalUsage.cost.cacheRead += assistantMsg.usage.cost.cacheRead;
|
||||||
|
runState.totalUsage.cost.cacheWrite += assistantMsg.usage.cost.cacheWrite;
|
||||||
|
runState.totalUsage.cost.total += assistantMsg.usage.cost.total;
|
||||||
|
}
|
||||||
|
|
||||||
|
const content = agentEvent.message.content;
|
||||||
|
const thinkingParts: string[] = [];
|
||||||
|
const textParts: string[] = [];
|
||||||
|
for (const part of content) {
|
||||||
|
if (part.type === "thinking") {
|
||||||
|
thinkingParts.push((part as any).thinking);
|
||||||
|
} else if (part.type === "text") {
|
||||||
|
textParts.push((part as any).text);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const text = textParts.join("\n");
|
||||||
|
|
||||||
|
for (const thinking of thinkingParts) {
|
||||||
|
log.logThinking(logCtx, thinking);
|
||||||
|
queue.enqueueMessage(`_${thinking}_`, "main", "thinking main");
|
||||||
|
queue.enqueueMessage(`_${thinking}_`, "thread", "thinking thread", false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (text.trim()) {
|
||||||
|
log.logResponse(logCtx, text);
|
||||||
|
queue.enqueueMessage(text, "main", "response main");
|
||||||
|
queue.enqueueMessage(text, "thread", "response thread", false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (event.type === "auto_compaction_start") {
|
||||||
|
log.logInfo(`Auto-compaction started (reason: ${(event as any).reason})`);
|
||||||
|
queue.enqueue(() => ctx.respond("_Compacting context..._", false), "compaction start");
|
||||||
|
} else if (event.type === "auto_compaction_end") {
|
||||||
|
const compEvent = event as any;
|
||||||
|
if (compEvent.result) {
|
||||||
|
log.logInfo(`Auto-compaction complete: ${compEvent.result.tokensBefore} tokens compacted`);
|
||||||
|
} else if (compEvent.aborted) {
|
||||||
|
log.logInfo("Auto-compaction aborted");
|
||||||
|
}
|
||||||
|
} else if (event.type === "auto_retry_start") {
|
||||||
|
const retryEvent = event as any;
|
||||||
|
log.logWarning(`Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})`, retryEvent.errorMessage);
|
||||||
|
queue.enqueue(
|
||||||
|
() => ctx.respond(`_Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})..._`, false),
|
||||||
|
"retry",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Slack message limit
|
||||||
|
const SLACK_MAX_LENGTH = 40000;
|
||||||
|
const splitForSlack = (text: string): string[] => {
|
||||||
|
if (text.length <= SLACK_MAX_LENGTH) return [text];
|
||||||
|
const parts: string[] = [];
|
||||||
|
let remaining = text;
|
||||||
|
let partNum = 1;
|
||||||
|
while (remaining.length > 0) {
|
||||||
|
const chunk = remaining.substring(0, SLACK_MAX_LENGTH - 50);
|
||||||
|
remaining = remaining.substring(SLACK_MAX_LENGTH - 50);
|
||||||
|
const suffix = remaining.length > 0 ? `\n_(continued ${partNum}...)_` : "";
|
||||||
|
parts.push(chunk + suffix);
|
||||||
|
partNum++;
|
||||||
|
}
|
||||||
|
return parts;
|
||||||
|
};
|
||||||
|
|
||||||
return {
|
return {
|
||||||
async run(ctx: SlackContext, channelDir: string, _store: ChannelStore): Promise<{ stopReason: string }> {
|
async run(
|
||||||
|
ctx: SlackContext,
|
||||||
|
_store: ChannelStore,
|
||||||
|
_pendingMessages?: PendingMessage[],
|
||||||
|
): Promise<{ stopReason: string }> {
|
||||||
// Ensure channel directory exists
|
// Ensure channel directory exists
|
||||||
await mkdir(channelDir, { recursive: true });
|
await mkdir(channelDir, { recursive: true });
|
||||||
|
|
||||||
const channelId = ctx.message.channel;
|
// Reload messages from context.jsonl
|
||||||
const workspacePath = executor.getWorkspacePath(channelDir.replace(`/${channelId}`, ""));
|
// This picks up any messages synced from log.jsonl before this run
|
||||||
|
const reloadedSession = sessionManager.loadSession();
|
||||||
|
if (reloadedSession.messages.length > 0) {
|
||||||
|
agent.replaceMessages(reloadedSession.messages);
|
||||||
|
log.logInfo(`[${channelId}] Reloaded ${reloadedSession.messages.length} messages from context`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update system prompt with fresh memory and channel/user info
|
||||||
const memory = getMemory(channelDir);
|
const memory = getMemory(channelDir);
|
||||||
const systemPrompt = buildSystemPrompt(
|
const systemPrompt = buildSystemPrompt(
|
||||||
workspacePath,
|
workspacePath,
|
||||||
|
|
@ -278,123 +514,36 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
|
||||||
ctx.channels,
|
ctx.channels,
|
||||||
ctx.users,
|
ctx.users,
|
||||||
);
|
);
|
||||||
|
session.agent.setSystemPrompt(systemPrompt);
|
||||||
|
|
||||||
// Debug: log context sizes
|
// Set up file upload function
|
||||||
log.logInfo(`Context sizes - system: ${systemPrompt.length} chars, memory: ${memory.length} chars`);
|
|
||||||
log.logInfo(`Channels: ${ctx.channels.length}, Users: ${ctx.users.length}`);
|
|
||||||
|
|
||||||
// Set up file upload function for the attach tool
|
|
||||||
setUploadFunction(async (filePath: string, title?: string) => {
|
setUploadFunction(async (filePath: string, title?: string) => {
|
||||||
const hostPath = translateToHostPath(filePath, channelDir, workspacePath, channelId);
|
const hostPath = translateToHostPath(filePath, channelDir, workspacePath, channelId);
|
||||||
await ctx.uploadFile(hostPath, title);
|
await ctx.uploadFile(hostPath, title);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Create tools with executor
|
// Reset per-run state
|
||||||
const tools = createMomTools(executor);
|
runState.ctx = ctx;
|
||||||
|
runState.logCtx = {
|
||||||
// Get or create AgentSession for this channel
|
|
||||||
const cached = channelSessions.get(channelId);
|
|
||||||
let session: AgentSession;
|
|
||||||
let sessionManager: MomSessionManager;
|
|
||||||
|
|
||||||
if (!cached) {
|
|
||||||
// Create session manager and settings manager
|
|
||||||
sessionManager = new MomSessionManager(channelDir);
|
|
||||||
const settingsManager = new MomSettingsManager(join(channelDir, ".."));
|
|
||||||
|
|
||||||
// Create agent with proper message transformer for compaction
|
|
||||||
const agent = new Agent({
|
|
||||||
initialState: {
|
|
||||||
systemPrompt,
|
|
||||||
model,
|
|
||||||
thinkingLevel: "off",
|
|
||||||
tools,
|
|
||||||
},
|
|
||||||
messageTransformer,
|
|
||||||
transport: new ProviderTransport({
|
|
||||||
getApiKey: async () => getAnthropicApiKey(),
|
|
||||||
}),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Load existing messages from session
|
|
||||||
const loadedSession = sessionManager.loadSession();
|
|
||||||
if (loadedSession.messages.length > 0) {
|
|
||||||
agent.replaceMessages(loadedSession.messages);
|
|
||||||
log.logInfo(`Loaded ${loadedSession.messages.length} messages from context.jsonl`);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create AgentSession wrapper
|
|
||||||
session = new AgentSession({
|
|
||||||
agent,
|
|
||||||
sessionManager: sessionManager as any, // Type compatibility
|
|
||||||
settingsManager: settingsManager as any, // Type compatibility
|
|
||||||
});
|
|
||||||
|
|
||||||
channelSessions.set(channelId, { session, sessionManager });
|
|
||||||
} else {
|
|
||||||
session = cached.session;
|
|
||||||
sessionManager = cached.sessionManager;
|
|
||||||
|
|
||||||
// Update system prompt for existing session (memory may have changed)
|
|
||||||
session.agent.setSystemPrompt(systemPrompt);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync messages from log.jsonl to context.jsonl
|
|
||||||
// Exclude the current message - it will be added via prompt()
|
|
||||||
sessionManager.syncFromLog(ctx.message.ts);
|
|
||||||
|
|
||||||
currentSession = session;
|
|
||||||
|
|
||||||
// Create logging context
|
|
||||||
const logCtx = {
|
|
||||||
channelId: ctx.message.channel,
|
channelId: ctx.message.channel,
|
||||||
userName: ctx.message.userName,
|
userName: ctx.message.userName,
|
||||||
channelName: ctx.channelName,
|
channelName: ctx.channelName,
|
||||||
};
|
};
|
||||||
|
runState.pendingTools.clear();
|
||||||
// Track pending tool calls to pair args with results and timing
|
runState.totalUsage = {
|
||||||
const pendingTools = new Map<string, { toolName: string; args: unknown; startTime: number }>();
|
|
||||||
|
|
||||||
// Track usage across all assistant messages in this run
|
|
||||||
const totalUsage = {
|
|
||||||
input: 0,
|
input: 0,
|
||||||
output: 0,
|
output: 0,
|
||||||
cacheRead: 0,
|
cacheRead: 0,
|
||||||
cacheWrite: 0,
|
cacheWrite: 0,
|
||||||
cost: {
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
||||||
input: 0,
|
|
||||||
output: 0,
|
|
||||||
cacheRead: 0,
|
|
||||||
cacheWrite: 0,
|
|
||||||
total: 0,
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
runState.stopReason = "stop";
|
||||||
|
|
||||||
// Track stop reason
|
// Create queue for this run
|
||||||
let stopReason = "stop";
|
let queueChain = Promise.resolve();
|
||||||
|
runState.queue = {
|
||||||
// Slack message limit is 40,000 characters
|
|
||||||
const SLACK_MAX_LENGTH = 40000;
|
|
||||||
const splitForSlack = (text: string): string[] => {
|
|
||||||
if (text.length <= SLACK_MAX_LENGTH) return [text];
|
|
||||||
const parts: string[] = [];
|
|
||||||
let remaining = text;
|
|
||||||
let partNum = 1;
|
|
||||||
while (remaining.length > 0) {
|
|
||||||
const chunk = remaining.substring(0, SLACK_MAX_LENGTH - 50);
|
|
||||||
remaining = remaining.substring(SLACK_MAX_LENGTH - 50);
|
|
||||||
const suffix = remaining.length > 0 ? `\n_(continued ${partNum}...)_` : "";
|
|
||||||
parts.push(chunk + suffix);
|
|
||||||
partNum++;
|
|
||||||
}
|
|
||||||
return parts;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Promise queue to ensure ctx.respond/respondInThread calls execute in order
|
|
||||||
const queue = {
|
|
||||||
chain: Promise.resolve(),
|
|
||||||
enqueue(fn: () => Promise<void>, errorContext: string): void {
|
enqueue(fn: () => Promise<void>, errorContext: string): void {
|
||||||
this.chain = this.chain.then(async () => {
|
queueChain = queueChain.then(async () => {
|
||||||
try {
|
try {
|
||||||
await fn();
|
await fn();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|
@ -417,188 +566,74 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
flush(): Promise<void> {
|
|
||||||
return this.chain;
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Subscribe to session events
|
// Log context info
|
||||||
const unsubscribe = session.subscribe(async (event) => {
|
log.logInfo(`Context sizes - system: ${systemPrompt.length} chars, memory: ${memory.length} chars`);
|
||||||
// Handle agent events
|
log.logInfo(`Channels: ${ctx.channels.length}, Users: ${ctx.users.length}`);
|
||||||
if (event.type === "tool_execution_start") {
|
|
||||||
const agentEvent = event as AgentEvent & { type: "tool_execution_start" };
|
|
||||||
const args = agentEvent.args as { label?: string };
|
|
||||||
const label = args.label || agentEvent.toolName;
|
|
||||||
|
|
||||||
pendingTools.set(agentEvent.toolCallId, {
|
// Build user message with username prefix
|
||||||
toolName: agentEvent.toolName,
|
// Format: "[username]: message" so LLM knows who's talking
|
||||||
args: agentEvent.args,
|
let userMessage = `[${ctx.message.userName || "unknown"}]: ${ctx.message.text}`;
|
||||||
startTime: Date.now(),
|
|
||||||
});
|
|
||||||
|
|
||||||
log.logToolStart(logCtx, agentEvent.toolName, label, agentEvent.args as Record<string, unknown>);
|
// Add attachment paths if any
|
||||||
|
if (ctx.message.attachments && ctx.message.attachments.length > 0) {
|
||||||
// NOTE: Tool results are NOT logged to log.jsonl anymore
|
const attachmentPaths = ctx.message.attachments.map((a) => a.local).join("\n");
|
||||||
// They are stored in context.jsonl via AgentSession
|
userMessage += `\n\nAttachments:\n${attachmentPaths}`;
|
||||||
|
|
||||||
queue.enqueue(() => ctx.respond(`_→ ${label}_`, false), "tool label");
|
|
||||||
} else if (event.type === "tool_execution_end") {
|
|
||||||
const agentEvent = event as AgentEvent & { type: "tool_execution_end" };
|
|
||||||
const resultStr = extractToolResultText(agentEvent.result);
|
|
||||||
const pending = pendingTools.get(agentEvent.toolCallId);
|
|
||||||
pendingTools.delete(agentEvent.toolCallId);
|
|
||||||
|
|
||||||
const durationMs = pending ? Date.now() - pending.startTime : 0;
|
|
||||||
|
|
||||||
if (agentEvent.isError) {
|
|
||||||
log.logToolError(logCtx, agentEvent.toolName, durationMs, resultStr);
|
|
||||||
} else {
|
|
||||||
log.logToolSuccess(logCtx, agentEvent.toolName, durationMs, resultStr);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Post args + result to thread (for debugging)
|
|
||||||
const label = pending?.args ? (pending.args as { label?: string }).label : undefined;
|
|
||||||
const argsFormatted = pending
|
|
||||||
? formatToolArgsForSlack(agentEvent.toolName, pending.args as Record<string, unknown>)
|
|
||||||
: "(args not found)";
|
|
||||||
const duration = (durationMs / 1000).toFixed(1);
|
|
||||||
let threadMessage = `*${agentEvent.isError ? "✗" : "✓"} ${agentEvent.toolName}*`;
|
|
||||||
if (label) {
|
|
||||||
threadMessage += `: ${label}`;
|
|
||||||
}
|
|
||||||
threadMessage += ` (${duration}s)\n`;
|
|
||||||
|
|
||||||
if (argsFormatted) {
|
|
||||||
threadMessage += "```\n" + argsFormatted + "\n```\n";
|
|
||||||
}
|
|
||||||
|
|
||||||
threadMessage += "*Result:*\n```\n" + resultStr + "\n```";
|
|
||||||
|
|
||||||
queue.enqueueMessage(threadMessage, "thread", "tool result thread", false);
|
|
||||||
|
|
||||||
if (agentEvent.isError) {
|
|
||||||
queue.enqueue(() => ctx.respond(`_Error: ${truncate(resultStr, 200)}_`, false), "tool error");
|
|
||||||
}
|
|
||||||
} else if (event.type === "message_start") {
|
|
||||||
const agentEvent = event as AgentEvent & { type: "message_start" };
|
|
||||||
if (agentEvent.message.role === "assistant") {
|
|
||||||
log.logResponseStart(logCtx);
|
|
||||||
}
|
|
||||||
} else if (event.type === "message_end") {
|
|
||||||
const agentEvent = event as AgentEvent & { type: "message_end" };
|
|
||||||
if (agentEvent.message.role === "assistant") {
|
|
||||||
const assistantMsg = agentEvent.message as any;
|
|
||||||
|
|
||||||
if (assistantMsg.stopReason) {
|
|
||||||
stopReason = assistantMsg.stopReason;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (assistantMsg.usage) {
|
|
||||||
totalUsage.input += assistantMsg.usage.input;
|
|
||||||
totalUsage.output += assistantMsg.usage.output;
|
|
||||||
totalUsage.cacheRead += assistantMsg.usage.cacheRead;
|
|
||||||
totalUsage.cacheWrite += assistantMsg.usage.cacheWrite;
|
|
||||||
totalUsage.cost.input += assistantMsg.usage.cost.input;
|
|
||||||
totalUsage.cost.output += assistantMsg.usage.cost.output;
|
|
||||||
totalUsage.cost.cacheRead += assistantMsg.usage.cost.cacheRead;
|
|
||||||
totalUsage.cost.cacheWrite += assistantMsg.usage.cost.cacheWrite;
|
|
||||||
totalUsage.cost.total += assistantMsg.usage.cost.total;
|
|
||||||
}
|
|
||||||
|
|
||||||
const content = agentEvent.message.content;
|
|
||||||
const thinkingParts: string[] = [];
|
|
||||||
const textParts: string[] = [];
|
|
||||||
for (const part of content) {
|
|
||||||
if (part.type === "thinking") {
|
|
||||||
thinkingParts.push((part as any).thinking);
|
|
||||||
} else if (part.type === "text") {
|
|
||||||
textParts.push((part as any).text);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const text = textParts.join("\n");
|
|
||||||
|
|
||||||
for (const thinking of thinkingParts) {
|
|
||||||
log.logThinking(logCtx, thinking);
|
|
||||||
queue.enqueueMessage(`_${thinking}_`, "main", "thinking main");
|
|
||||||
queue.enqueueMessage(`_${thinking}_`, "thread", "thinking thread", false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (text.trim()) {
|
|
||||||
log.logResponse(logCtx, text);
|
|
||||||
queue.enqueueMessage(text, "main", "response main");
|
|
||||||
queue.enqueueMessage(text, "thread", "response thread", false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (event.type === "auto_compaction_start") {
|
|
||||||
log.logInfo(`Auto-compaction started (reason: ${(event as any).reason})`);
|
|
||||||
queue.enqueue(() => ctx.respond("_Compacting context..._", false), "compaction start");
|
|
||||||
} else if (event.type === "auto_compaction_end") {
|
|
||||||
const compEvent = event as any;
|
|
||||||
if (compEvent.result) {
|
|
||||||
log.logInfo(`Auto-compaction complete: ${compEvent.result.tokensBefore} tokens compacted`);
|
|
||||||
} else if (compEvent.aborted) {
|
|
||||||
log.logInfo("Auto-compaction aborted");
|
|
||||||
}
|
|
||||||
} else if (event.type === "auto_retry_start") {
|
|
||||||
const retryEvent = event as any;
|
|
||||||
log.logWarning(`Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})`, retryEvent.errorMessage);
|
|
||||||
queue.enqueue(
|
|
||||||
() => ctx.respond(`_Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})..._`, false),
|
|
||||||
"retry",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Build user message from Slack context
|
|
||||||
// Note: User message is already logged to log.jsonl by Slack event handler
|
|
||||||
const userMessage = ctx.message.text;
|
|
||||||
|
|
||||||
// Send prompt to agent session
|
|
||||||
await session.prompt(userMessage);
|
|
||||||
|
|
||||||
// Wait for all queued Slack messages
|
|
||||||
await queue.flush();
|
|
||||||
|
|
||||||
// Get final assistant text and update main message
|
|
||||||
const messages = session.messages;
|
|
||||||
const lastAssistant = messages.filter((m) => m.role === "assistant").pop();
|
|
||||||
const finalText =
|
|
||||||
lastAssistant?.content
|
|
||||||
.filter((c): c is { type: "text"; text: string } => c.type === "text")
|
|
||||||
.map((c) => c.text)
|
|
||||||
.join("\n") || "";
|
|
||||||
|
|
||||||
if (finalText.trim()) {
|
|
||||||
// Note: Bot response is logged via ctx.respond() in the event handler
|
|
||||||
try {
|
|
||||||
const mainText =
|
|
||||||
finalText.length > SLACK_MAX_LENGTH
|
|
||||||
? finalText.substring(0, SLACK_MAX_LENGTH - 50) + "\n\n_(see thread for full response)_"
|
|
||||||
: finalText;
|
|
||||||
await ctx.replaceMessage(mainText);
|
|
||||||
} catch (err) {
|
|
||||||
const errMsg = err instanceof Error ? err.message : String(err);
|
|
||||||
log.logWarning("Failed to replace message with final text", errMsg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log usage summary
|
|
||||||
if (totalUsage.cost.total > 0) {
|
|
||||||
const summary = log.logUsageSummary(logCtx, totalUsage);
|
|
||||||
queue.enqueue(() => ctx.respondInThread(summary), "usage summary");
|
|
||||||
await queue.flush();
|
|
||||||
}
|
|
||||||
|
|
||||||
return { stopReason };
|
|
||||||
} finally {
|
|
||||||
unsubscribe();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Debug: write context to last_prompt.jsonl
|
||||||
|
const debugContext = {
|
||||||
|
systemPrompt,
|
||||||
|
messages: session.messages,
|
||||||
|
newUserMessage: userMessage,
|
||||||
|
};
|
||||||
|
await writeFile(join(channelDir, "last_prompt.jsonl"), JSON.stringify(debugContext, null, 2));
|
||||||
|
|
||||||
|
await session.prompt(userMessage);
|
||||||
|
|
||||||
|
// Wait for queued messages
|
||||||
|
await queueChain;
|
||||||
|
|
||||||
|
// Final message update
|
||||||
|
const messages = session.messages;
|
||||||
|
const lastAssistant = messages.filter((m) => m.role === "assistant").pop();
|
||||||
|
const finalText =
|
||||||
|
lastAssistant?.content
|
||||||
|
.filter((c): c is { type: "text"; text: string } => c.type === "text")
|
||||||
|
.map((c) => c.text)
|
||||||
|
.join("\n") || "";
|
||||||
|
|
||||||
|
if (finalText.trim()) {
|
||||||
|
try {
|
||||||
|
const mainText =
|
||||||
|
finalText.length > SLACK_MAX_LENGTH
|
||||||
|
? finalText.substring(0, SLACK_MAX_LENGTH - 50) + "\n\n_(see thread for full response)_"
|
||||||
|
: finalText;
|
||||||
|
await ctx.replaceMessage(mainText);
|
||||||
|
} catch (err) {
|
||||||
|
const errMsg = err instanceof Error ? err.message : String(err);
|
||||||
|
log.logWarning("Failed to replace message with final text", errMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log usage summary
|
||||||
|
if (runState.totalUsage.cost.total > 0) {
|
||||||
|
const summary = log.logUsageSummary(runState.logCtx!, runState.totalUsage);
|
||||||
|
runState.queue.enqueue(() => ctx.respondInThread(summary), "usage summary");
|
||||||
|
await queueChain;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear run state
|
||||||
|
runState.ctx = null;
|
||||||
|
runState.logCtx = null;
|
||||||
|
runState.queue = null;
|
||||||
|
|
||||||
|
return { stopReason: runState.stopReason };
|
||||||
},
|
},
|
||||||
|
|
||||||
abort(): void {
|
abort(): void {
|
||||||
currentSession?.abort();
|
session.abort();
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,7 @@ export class MomSessionManager {
|
||||||
private inMemoryEntries: SessionEntry[] = [];
|
private inMemoryEntries: SessionEntry[] = [];
|
||||||
private pendingEntries: SessionEntry[] = [];
|
private pendingEntries: SessionEntry[] = [];
|
||||||
|
|
||||||
constructor(channelDir: string) {
|
constructor(channelDir: string, initialModel?: { provider: string; id: string; thinkingLevel?: string }) {
|
||||||
this.channelDir = channelDir;
|
this.channelDir = channelDir;
|
||||||
this.contextFile = join(channelDir, "context.jsonl");
|
this.contextFile = join(channelDir, "context.jsonl");
|
||||||
this.logFile = join(channelDir, "log.jsonl");
|
this.logFile = join(channelDir, "log.jsonl");
|
||||||
|
|
@ -68,11 +68,33 @@ export class MomSessionManager {
|
||||||
this.sessionId = this.extractSessionId() || uuidv4();
|
this.sessionId = this.extractSessionId() || uuidv4();
|
||||||
this.sessionInitialized = this.inMemoryEntries.length > 0;
|
this.sessionInitialized = this.inMemoryEntries.length > 0;
|
||||||
} else {
|
} else {
|
||||||
|
// New session - write header immediately
|
||||||
this.sessionId = uuidv4();
|
this.sessionId = uuidv4();
|
||||||
|
if (initialModel) {
|
||||||
|
this.writeSessionHeader(initialModel);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Note: syncFromLog() is called explicitly from agent.ts with excludeTimestamp
|
// Note: syncFromLog() is called explicitly from agent.ts with excludeTimestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Write session header to file (called on new session creation) */
|
||||||
|
private writeSessionHeader(model: { provider: string; id: string; thinkingLevel?: string }): void {
|
||||||
|
this.sessionInitialized = true;
|
||||||
|
|
||||||
|
const entry: SessionHeader = {
|
||||||
|
type: "session",
|
||||||
|
id: this.sessionId,
|
||||||
|
timestamp: new Date().toISOString(),
|
||||||
|
cwd: this.channelDir,
|
||||||
|
provider: model.provider,
|
||||||
|
modelId: model.id,
|
||||||
|
thinkingLevel: model.thinkingLevel || "off",
|
||||||
|
};
|
||||||
|
|
||||||
|
this.inMemoryEntries.push(entry);
|
||||||
|
appendFileSync(this.contextFile, JSON.stringify(entry) + "\n");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sync user messages from log.jsonl that aren't in context.jsonl.
|
* Sync user messages from log.jsonl that aren't in context.jsonl.
|
||||||
*
|
*
|
||||||
|
|
@ -84,18 +106,48 @@ export class MomSessionManager {
|
||||||
*
|
*
|
||||||
* Channel chatter is formatted as "[username]: message" to distinguish from direct @mentions.
|
* Channel chatter is formatted as "[username]: message" to distinguish from direct @mentions.
|
||||||
*
|
*
|
||||||
* Called automatically on construction and should be called before each agent run.
|
* Called before each agent run.
|
||||||
*
|
*
|
||||||
* @param excludeTimestamp Optional timestamp to exclude (for the current @mention being processed)
|
* @param excludeSlackTs Slack timestamp of current message (will be added via prompt(), not sync)
|
||||||
*/
|
*/
|
||||||
syncFromLog(excludeTimestamp?: string): void {
|
syncFromLog(excludeSlackTs?: string): void {
|
||||||
if (!existsSync(this.logFile)) return;
|
if (!existsSync(this.logFile)) return;
|
||||||
|
|
||||||
// Get timestamps of messages already in context
|
// Build set of Slack timestamps already in context
|
||||||
const contextTimestamps = new Set<string>();
|
// We store slackTs in the message content or can extract from formatted messages
|
||||||
|
// For messages synced from log, we use the log's date as the entry timestamp
|
||||||
|
// For messages added via prompt(), they have different timestamps
|
||||||
|
// So we need to match by content OR by stored slackTs
|
||||||
|
const contextSlackTimestamps = new Set<string>();
|
||||||
|
const contextMessageTexts = new Set<string>();
|
||||||
|
|
||||||
for (const entry of this.inMemoryEntries) {
|
for (const entry of this.inMemoryEntries) {
|
||||||
if (entry.type === "message") {
|
if (entry.type === "message") {
|
||||||
contextTimestamps.add(entry.timestamp);
|
const msgEntry = entry as SessionMessageEntry;
|
||||||
|
// Store the entry timestamp (which is the log date for synced messages)
|
||||||
|
contextSlackTimestamps.add(entry.timestamp);
|
||||||
|
|
||||||
|
// Also store message text to catch duplicates added via prompt()
|
||||||
|
// AppMessage has different shapes, check for content property
|
||||||
|
const msg = msgEntry.message as { role: string; content?: unknown };
|
||||||
|
if (msg.role === "user" && msg.content !== undefined) {
|
||||||
|
const content = msg.content;
|
||||||
|
if (typeof content === "string") {
|
||||||
|
contextMessageTexts.add(content);
|
||||||
|
} else if (Array.isArray(content)) {
|
||||||
|
for (const part of content) {
|
||||||
|
if (
|
||||||
|
typeof part === "object" &&
|
||||||
|
part !== null &&
|
||||||
|
"type" in part &&
|
||||||
|
part.type === "text" &&
|
||||||
|
"text" in part
|
||||||
|
) {
|
||||||
|
contextMessageTexts.add((part as { type: "text"; text: string }).text);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -112,34 +164,39 @@ export class MomSessionManager {
|
||||||
isBot?: boolean;
|
isBot?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
const newMessages: Array<{ timestamp: string; message: AppMessage }> = [];
|
const newMessages: Array<{ timestamp: string; slackTs: string; message: AppMessage }> = [];
|
||||||
|
|
||||||
for (const line of logLines) {
|
for (const line of logLines) {
|
||||||
try {
|
try {
|
||||||
const logMsg: LogMessage = JSON.parse(line);
|
const logMsg: LogMessage = JSON.parse(line);
|
||||||
|
|
||||||
// Use date for context timestamp (consistent key)
|
const slackTs = logMsg.ts;
|
||||||
const ts = logMsg.date || logMsg.ts;
|
const date = logMsg.date;
|
||||||
if (!ts) continue;
|
if (!slackTs || !date) continue;
|
||||||
|
|
||||||
// Skip if already in context
|
|
||||||
if (contextTimestamps.has(ts)) continue;
|
|
||||||
|
|
||||||
// Skip the current message being processed (will be added via prompt())
|
// Skip the current message being processed (will be added via prompt())
|
||||||
// Compare against Slack ts since that's what ctx.message.ts provides
|
if (excludeSlackTs && slackTs === excludeSlackTs) continue;
|
||||||
if (excludeTimestamp && logMsg.ts === excludeTimestamp) continue;
|
|
||||||
|
|
||||||
// Skip bot messages - added through agent flow
|
// Skip bot messages - added through agent flow
|
||||||
if (logMsg.isBot) continue;
|
if (logMsg.isBot) continue;
|
||||||
|
|
||||||
const msgTime = new Date(ts).getTime() || Date.now();
|
// Skip if this date is already in context (was synced before)
|
||||||
|
if (contextSlackTimestamps.has(date)) continue;
|
||||||
|
|
||||||
|
// Build the message text as it would appear in context
|
||||||
|
const messageText = `[${logMsg.userName || logMsg.user || "unknown"}]: ${logMsg.text || ""}`;
|
||||||
|
|
||||||
|
// Skip if this exact message text is already in context (added via prompt())
|
||||||
|
if (contextMessageTexts.has(messageText)) continue;
|
||||||
|
|
||||||
|
const msgTime = new Date(date).getTime() || Date.now();
|
||||||
const userMessage: AppMessage = {
|
const userMessage: AppMessage = {
|
||||||
role: "user",
|
role: "user",
|
||||||
content: `[${logMsg.userName || logMsg.user || "unknown"}]: ${logMsg.text || ""}`,
|
content: messageText,
|
||||||
timestamp: msgTime,
|
timestamp: msgTime,
|
||||||
};
|
};
|
||||||
|
|
||||||
newMessages.push({ timestamp: ts, message: userMessage });
|
newMessages.push({ timestamp: date, slackTs, message: userMessage });
|
||||||
} catch {
|
} catch {
|
||||||
// Skip malformed lines
|
// Skip malformed lines
|
||||||
}
|
}
|
||||||
|
|
@ -153,15 +210,13 @@ export class MomSessionManager {
|
||||||
for (const { timestamp, message } of newMessages) {
|
for (const { timestamp, message } of newMessages) {
|
||||||
const entry: SessionMessageEntry = {
|
const entry: SessionMessageEntry = {
|
||||||
type: "message",
|
type: "message",
|
||||||
timestamp,
|
timestamp, // Use log date as entry timestamp for consistent deduplication
|
||||||
message,
|
message,
|
||||||
};
|
};
|
||||||
|
|
||||||
this.inMemoryEntries.push(entry);
|
this.inMemoryEntries.push(entry);
|
||||||
appendFileSync(this.contextFile, JSON.stringify(entry) + "\n");
|
appendFileSync(this.contextFile, JSON.stringify(entry) + "\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync complete - newMessages.length messages added
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private extractSessionId(): string | null {
|
private extractSessionId(): string | null {
|
||||||
|
|
@ -483,3 +538,118 @@ export class MomSettingsManager {
|
||||||
return 30000;
|
return 30000;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Sync log.jsonl to context.jsonl
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sync user messages from log.jsonl to context.jsonl.
|
||||||
|
*
|
||||||
|
* This ensures that messages logged while mom wasn't running (channel chatter,
|
||||||
|
* backfilled messages, messages while busy) are added to the LLM context.
|
||||||
|
*
|
||||||
|
* @param channelDir - Path to channel directory
|
||||||
|
* @param excludeAfterTs - Don't sync messages with ts >= this value (they'll be handled by agent)
|
||||||
|
* @returns Number of messages synced
|
||||||
|
*/
|
||||||
|
export function syncLogToContext(channelDir: string, excludeAfterTs?: string): number {
|
||||||
|
const logFile = join(channelDir, "log.jsonl");
|
||||||
|
const contextFile = join(channelDir, "context.jsonl");
|
||||||
|
|
||||||
|
if (!existsSync(logFile)) return 0;
|
||||||
|
|
||||||
|
// Read all user messages from log.jsonl
|
||||||
|
const logContent = readFileSync(logFile, "utf-8");
|
||||||
|
const logLines = logContent.trim().split("\n").filter(Boolean);
|
||||||
|
|
||||||
|
interface LogEntry {
|
||||||
|
ts: string;
|
||||||
|
user: string;
|
||||||
|
userName?: string;
|
||||||
|
text: string;
|
||||||
|
isBot: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
const logMessages: LogEntry[] = [];
|
||||||
|
for (const line of logLines) {
|
||||||
|
try {
|
||||||
|
const entry = JSON.parse(line) as LogEntry;
|
||||||
|
// Only sync user messages (not bot responses)
|
||||||
|
if (!entry.isBot && entry.ts && entry.text) {
|
||||||
|
// Skip if >= excludeAfterTs
|
||||||
|
if (excludeAfterTs && entry.ts >= excludeAfterTs) continue;
|
||||||
|
logMessages.push(entry);
|
||||||
|
}
|
||||||
|
} catch {}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (logMessages.length === 0) return 0;
|
||||||
|
|
||||||
|
// Read existing timestamps from context.jsonl
|
||||||
|
const existingTs = new Set<string>();
|
||||||
|
if (existsSync(contextFile)) {
|
||||||
|
const contextContent = readFileSync(contextFile, "utf-8");
|
||||||
|
const contextLines = contextContent.trim().split("\n").filter(Boolean);
|
||||||
|
for (const line of contextLines) {
|
||||||
|
try {
|
||||||
|
const entry = JSON.parse(line);
|
||||||
|
if (entry.type === "message" && entry.message?.role === "user" && entry.message?.timestamp) {
|
||||||
|
// Extract ts from timestamp (ms -> slack ts format for comparison)
|
||||||
|
// We store the original slack ts in a way we can recover
|
||||||
|
// Actually, let's just check by content match since ts formats differ
|
||||||
|
}
|
||||||
|
} catch {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// For deduplication, we need to track what's already in context
|
||||||
|
// Read context and extract user message content
|
||||||
|
const existingMessages = new Set<string>();
|
||||||
|
if (existsSync(contextFile)) {
|
||||||
|
const contextContent = readFileSync(contextFile, "utf-8");
|
||||||
|
const contextLines = contextContent.trim().split("\n").filter(Boolean);
|
||||||
|
for (const line of contextLines) {
|
||||||
|
try {
|
||||||
|
const entry = JSON.parse(line);
|
||||||
|
if (entry.type === "message" && entry.message?.role === "user") {
|
||||||
|
const content =
|
||||||
|
typeof entry.message.content === "string" ? entry.message.content : entry.message.content?.[0]?.text;
|
||||||
|
if (content) existingMessages.add(content);
|
||||||
|
}
|
||||||
|
} catch {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add missing messages to context.jsonl
|
||||||
|
let syncedCount = 0;
|
||||||
|
for (const msg of logMessages) {
|
||||||
|
const userName = msg.userName || msg.user;
|
||||||
|
const content = `[${userName}]: ${msg.text}`;
|
||||||
|
|
||||||
|
// Skip if already in context
|
||||||
|
if (existingMessages.has(content)) continue;
|
||||||
|
|
||||||
|
const timestamp = Math.floor(parseFloat(msg.ts) * 1000);
|
||||||
|
const entry = {
|
||||||
|
type: "message",
|
||||||
|
timestamp: new Date(timestamp).toISOString(),
|
||||||
|
message: {
|
||||||
|
role: "user",
|
||||||
|
content,
|
||||||
|
timestamp,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// Ensure directory exists
|
||||||
|
if (!existsSync(channelDir)) {
|
||||||
|
mkdirSync(channelDir, { recursive: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
appendFileSync(contextFile, JSON.stringify(entry) + "\n");
|
||||||
|
existingMessages.add(content); // Track to avoid duplicates within this sync
|
||||||
|
syncedCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return syncedCount;
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,22 @@
|
||||||
#!/usr/bin/env node
|
#!/usr/bin/env node
|
||||||
|
|
||||||
import { join, resolve } from "path";
|
import { join, resolve } from "path";
|
||||||
import { type AgentRunner, createAgentRunner } from "./agent.js";
|
import { type AgentRunner, getOrCreateRunner } from "./agent.js";
|
||||||
|
import { syncLogToContext } from "./context.js";
|
||||||
import * as log from "./log.js";
|
import * as log from "./log.js";
|
||||||
import { parseSandboxArg, type SandboxConfig, validateSandbox } from "./sandbox.js";
|
import { parseSandboxArg, type SandboxConfig, validateSandbox } from "./sandbox.js";
|
||||||
import { MomBot, type SlackContext } from "./slack.js";
|
import { type MomHandler, type SlackBot, SlackBot as SlackBotClass, type SlackEvent } from "./slack.js";
|
||||||
|
import { ChannelStore } from "./store.js";
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Config
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
const MOM_SLACK_APP_TOKEN = process.env.MOM_SLACK_APP_TOKEN;
|
const MOM_SLACK_APP_TOKEN = process.env.MOM_SLACK_APP_TOKEN;
|
||||||
const MOM_SLACK_BOT_TOKEN = process.env.MOM_SLACK_BOT_TOKEN;
|
const MOM_SLACK_BOT_TOKEN = process.env.MOM_SLACK_BOT_TOKEN;
|
||||||
const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY;
|
const ANTHROPIC_API_KEY = process.env.ANTHROPIC_API_KEY;
|
||||||
const ANTHROPIC_OAUTH_TOKEN = process.env.ANTHROPIC_OAUTH_TOKEN;
|
const ANTHROPIC_OAUTH_TOKEN = process.env.ANTHROPIC_OAUTH_TOKEN;
|
||||||
|
|
||||||
// Parse command line arguments
|
|
||||||
function parseArgs(): { workingDir: string; sandbox: SandboxConfig } {
|
function parseArgs(): { workingDir: string; sandbox: SandboxConfig } {
|
||||||
const args = process.argv.slice(2);
|
const args = process.argv.slice(2);
|
||||||
let sandbox: SandboxConfig = { type: "host" };
|
let sandbox: SandboxConfig = { type: "host" };
|
||||||
|
|
@ -22,30 +27,14 @@ function parseArgs(): { workingDir: string; sandbox: SandboxConfig } {
|
||||||
if (arg.startsWith("--sandbox=")) {
|
if (arg.startsWith("--sandbox=")) {
|
||||||
sandbox = parseSandboxArg(arg.slice("--sandbox=".length));
|
sandbox = parseSandboxArg(arg.slice("--sandbox=".length));
|
||||||
} else if (arg === "--sandbox") {
|
} else if (arg === "--sandbox") {
|
||||||
const next = args[++i];
|
sandbox = parseSandboxArg(args[++i] || "");
|
||||||
if (!next) {
|
|
||||||
console.error("Error: --sandbox requires a value (host or docker:<container-name>)");
|
|
||||||
process.exit(1);
|
|
||||||
}
|
|
||||||
sandbox = parseSandboxArg(next);
|
|
||||||
} else if (!arg.startsWith("-")) {
|
} else if (!arg.startsWith("-")) {
|
||||||
workingDir = arg;
|
workingDir = arg;
|
||||||
} else {
|
|
||||||
console.error(`Unknown option: ${arg}`);
|
|
||||||
process.exit(1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!workingDir) {
|
if (!workingDir) {
|
||||||
console.error("Usage: mom [--sandbox=host|docker:<container-name>] <working-directory>");
|
console.error("Usage: mom [--sandbox=host|docker:<name>] <working-directory>");
|
||||||
console.error("");
|
|
||||||
console.error("Options:");
|
|
||||||
console.error(" --sandbox=host Run tools directly on host (default)");
|
|
||||||
console.error(" --sandbox=docker:<container> Run tools in Docker container");
|
|
||||||
console.error("");
|
|
||||||
console.error("Examples:");
|
|
||||||
console.error(" mom ./data");
|
|
||||||
console.error(" mom --sandbox=docker:mom-sandbox ./data");
|
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -54,101 +43,210 @@ function parseArgs(): { workingDir: string; sandbox: SandboxConfig } {
|
||||||
|
|
||||||
const { workingDir, sandbox } = parseArgs();
|
const { workingDir, sandbox } = parseArgs();
|
||||||
|
|
||||||
log.logStartup(workingDir, sandbox.type === "host" ? "host" : `docker:${sandbox.container}`);
|
|
||||||
|
|
||||||
if (!MOM_SLACK_APP_TOKEN || !MOM_SLACK_BOT_TOKEN || (!ANTHROPIC_API_KEY && !ANTHROPIC_OAUTH_TOKEN)) {
|
if (!MOM_SLACK_APP_TOKEN || !MOM_SLACK_BOT_TOKEN || (!ANTHROPIC_API_KEY && !ANTHROPIC_OAUTH_TOKEN)) {
|
||||||
console.error("Missing required environment variables:");
|
console.error("Missing env: MOM_SLACK_APP_TOKEN, MOM_SLACK_BOT_TOKEN, ANTHROPIC_API_KEY or ANTHROPIC_OAUTH_TOKEN");
|
||||||
if (!MOM_SLACK_APP_TOKEN) console.error(" - MOM_SLACK_APP_TOKEN (xapp-...)");
|
|
||||||
if (!MOM_SLACK_BOT_TOKEN) console.error(" - MOM_SLACK_BOT_TOKEN (xoxb-...)");
|
|
||||||
if (!ANTHROPIC_API_KEY && !ANTHROPIC_OAUTH_TOKEN) console.error(" - ANTHROPIC_API_KEY or ANTHROPIC_OAUTH_TOKEN");
|
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate sandbox configuration
|
|
||||||
await validateSandbox(sandbox);
|
await validateSandbox(sandbox);
|
||||||
|
|
||||||
// Track active agent runs per channel
|
// ============================================================================
|
||||||
const activeRuns = new Map<string, { runner: AgentRunner; context: SlackContext; stopContext?: SlackContext }>();
|
// State (per channel)
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
async function handleMessage(ctx: SlackContext, _source: "channel" | "dm"): Promise<void> {
|
interface ChannelState {
|
||||||
const channelId = ctx.message.channel;
|
running: boolean;
|
||||||
const messageText = ctx.message.text.toLowerCase().trim();
|
runner: AgentRunner;
|
||||||
|
store: ChannelStore;
|
||||||
const logCtx = {
|
stopRequested: boolean;
|
||||||
channelId: ctx.message.channel,
|
stopMessageTs?: string;
|
||||||
userName: ctx.message.userName,
|
|
||||||
channelName: ctx.channelName,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Check for stop command
|
|
||||||
if (messageText === "stop") {
|
|
||||||
const active = activeRuns.get(channelId);
|
|
||||||
if (active) {
|
|
||||||
log.logStopRequest(logCtx);
|
|
||||||
// Post a NEW message saying "Stopping..."
|
|
||||||
await ctx.respond("_Stopping..._");
|
|
||||||
// Store this context to update it to "Stopped" later
|
|
||||||
active.stopContext = ctx;
|
|
||||||
// Abort the runner
|
|
||||||
active.runner.abort();
|
|
||||||
} else {
|
|
||||||
await ctx.respond("_Nothing running._");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if already running in this channel
|
|
||||||
if (activeRuns.has(channelId)) {
|
|
||||||
await ctx.respond("_Already working on something. Say `@mom stop` to cancel._");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
log.logUserMessage(logCtx, ctx.message.text);
|
|
||||||
const channelDir = join(workingDir, channelId);
|
|
||||||
|
|
||||||
const runner = createAgentRunner(sandbox);
|
|
||||||
activeRuns.set(channelId, { runner, context: ctx });
|
|
||||||
|
|
||||||
await ctx.setTyping(true);
|
|
||||||
await ctx.setWorking(true);
|
|
||||||
|
|
||||||
const result = await runner.run(ctx, channelDir, ctx.store);
|
|
||||||
|
|
||||||
// Remove working indicator
|
|
||||||
await ctx.setWorking(false);
|
|
||||||
|
|
||||||
// Handle different stop reasons
|
|
||||||
const active = activeRuns.get(channelId);
|
|
||||||
if (result.stopReason === "aborted") {
|
|
||||||
// Replace the STOP message with "Stopped"
|
|
||||||
if (active?.stopContext) {
|
|
||||||
await active.stopContext.setWorking(false);
|
|
||||||
await active.stopContext.replaceMessage("_Stopped_");
|
|
||||||
}
|
|
||||||
} else if (result.stopReason === "error") {
|
|
||||||
// Agent encountered an error
|
|
||||||
log.logAgentError(logCtx, "Agent stopped with error");
|
|
||||||
}
|
|
||||||
// "stop", "length", "toolUse" are normal completions - nothing extra to do
|
|
||||||
|
|
||||||
activeRuns.delete(channelId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const bot = new MomBot(
|
const channelStates = new Map<string, ChannelState>();
|
||||||
{
|
|
||||||
async onChannelMention(ctx) {
|
function getState(channelId: string): ChannelState {
|
||||||
await handleMessage(ctx, "channel");
|
let state = channelStates.get(channelId);
|
||||||
|
if (!state) {
|
||||||
|
const channelDir = join(workingDir, channelId);
|
||||||
|
state = {
|
||||||
|
running: false,
|
||||||
|
runner: getOrCreateRunner(sandbox, channelId, channelDir),
|
||||||
|
store: new ChannelStore({ workingDir, botToken: MOM_SLACK_BOT_TOKEN! }),
|
||||||
|
stopRequested: false,
|
||||||
|
};
|
||||||
|
channelStates.set(channelId, state);
|
||||||
|
}
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Create SlackContext adapter
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelState) {
|
||||||
|
let messageTs: string | null = null;
|
||||||
|
let accumulatedText = "";
|
||||||
|
let isWorking = true;
|
||||||
|
const workingIndicator = " ...";
|
||||||
|
let updatePromise = Promise.resolve();
|
||||||
|
|
||||||
|
const user = slack.getUser(event.user);
|
||||||
|
|
||||||
|
return {
|
||||||
|
message: {
|
||||||
|
text: event.text,
|
||||||
|
rawText: event.text,
|
||||||
|
user: event.user,
|
||||||
|
userName: user?.userName,
|
||||||
|
channel: event.channel,
|
||||||
|
ts: event.ts,
|
||||||
|
attachments: [],
|
||||||
|
},
|
||||||
|
channelName: slack.getChannel(event.channel)?.name,
|
||||||
|
store: state.store,
|
||||||
|
channels: slack.getAllChannels().map((c) => ({ id: c.id, name: c.name })),
|
||||||
|
users: slack.getAllUsers().map((u) => ({ id: u.id, userName: u.userName, displayName: u.displayName })),
|
||||||
|
|
||||||
|
respond: async (text: string, shouldLog = true) => {
|
||||||
|
updatePromise = updatePromise.then(async () => {
|
||||||
|
accumulatedText = accumulatedText ? accumulatedText + "\n" + text : text;
|
||||||
|
const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText;
|
||||||
|
|
||||||
|
if (messageTs) {
|
||||||
|
await slack.updateMessage(event.channel, messageTs, displayText);
|
||||||
|
} else {
|
||||||
|
messageTs = await slack.postMessage(event.channel, displayText);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (shouldLog && messageTs) {
|
||||||
|
slack.logBotResponse(event.channel, text, messageTs);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
await updatePromise;
|
||||||
},
|
},
|
||||||
|
|
||||||
async onDirectMessage(ctx) {
|
replaceMessage: async (text: string) => {
|
||||||
await handleMessage(ctx, "dm");
|
updatePromise = updatePromise.then(async () => {
|
||||||
|
accumulatedText = text;
|
||||||
|
const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText;
|
||||||
|
if (messageTs) {
|
||||||
|
await slack.updateMessage(event.channel, messageTs, displayText);
|
||||||
|
} else {
|
||||||
|
messageTs = await slack.postMessage(event.channel, displayText);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
await updatePromise;
|
||||||
},
|
},
|
||||||
|
|
||||||
|
respondInThread: async (text: string) => {
|
||||||
|
updatePromise = updatePromise.then(async () => {
|
||||||
|
if (messageTs) {
|
||||||
|
await slack.postInThread(event.channel, messageTs, text);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
await updatePromise;
|
||||||
|
},
|
||||||
|
|
||||||
|
setTyping: async (isTyping: boolean) => {
|
||||||
|
if (isTyping && !messageTs) {
|
||||||
|
accumulatedText = "_Thinking_";
|
||||||
|
messageTs = await slack.postMessage(event.channel, accumulatedText + workingIndicator);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
uploadFile: async (filePath: string, title?: string) => {
|
||||||
|
await slack.uploadFile(event.channel, filePath, title);
|
||||||
|
},
|
||||||
|
|
||||||
|
setWorking: async (working: boolean) => {
|
||||||
|
updatePromise = updatePromise.then(async () => {
|
||||||
|
isWorking = working;
|
||||||
|
if (messageTs) {
|
||||||
|
const displayText = isWorking ? accumulatedText + workingIndicator : accumulatedText;
|
||||||
|
await slack.updateMessage(event.channel, messageTs, displayText);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
await updatePromise;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Handler
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
const handler: MomHandler = {
|
||||||
|
isRunning(channelId: string): boolean {
|
||||||
|
const state = channelStates.get(channelId);
|
||||||
|
return state?.running ?? false;
|
||||||
},
|
},
|
||||||
{
|
|
||||||
appToken: MOM_SLACK_APP_TOKEN,
|
async handleStop(channelId: string, slack: SlackBot): Promise<void> {
|
||||||
botToken: MOM_SLACK_BOT_TOKEN,
|
const state = channelStates.get(channelId);
|
||||||
workingDir,
|
if (state?.running) {
|
||||||
|
state.stopRequested = true;
|
||||||
|
state.runner.abort();
|
||||||
|
const ts = await slack.postMessage(channelId, "_Stopping..._");
|
||||||
|
state.stopMessageTs = ts; // Save for updating later
|
||||||
|
} else {
|
||||||
|
await slack.postMessage(channelId, "_Nothing running_");
|
||||||
|
}
|
||||||
},
|
},
|
||||||
);
|
|
||||||
|
async handleEvent(event: SlackEvent, slack: SlackBot): Promise<void> {
|
||||||
|
const state = getState(event.channel);
|
||||||
|
const channelDir = join(workingDir, event.channel);
|
||||||
|
|
||||||
|
// Start run
|
||||||
|
state.running = true;
|
||||||
|
state.stopRequested = false;
|
||||||
|
|
||||||
|
log.logInfo(`[${event.channel}] Starting run: ${event.text.substring(0, 50)}`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// SYNC context from log.jsonl BEFORE processing
|
||||||
|
// This adds any messages that were logged while mom wasn't running
|
||||||
|
// Exclude messages >= current ts (will be handled by agent)
|
||||||
|
const syncedCount = syncLogToContext(channelDir, event.ts);
|
||||||
|
if (syncedCount > 0) {
|
||||||
|
log.logInfo(`[${event.channel}] Synced ${syncedCount} messages from log to context`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create context adapter
|
||||||
|
const ctx = createSlackContext(event, slack, state);
|
||||||
|
|
||||||
|
// Run the agent
|
||||||
|
await ctx.setTyping(true);
|
||||||
|
await ctx.setWorking(true);
|
||||||
|
const result = await state.runner.run(ctx as any, state.store);
|
||||||
|
await ctx.setWorking(false);
|
||||||
|
|
||||||
|
if (result.stopReason === "aborted" && state.stopRequested) {
|
||||||
|
if (state.stopMessageTs) {
|
||||||
|
await slack.updateMessage(event.channel, state.stopMessageTs, "_Stopped_");
|
||||||
|
state.stopMessageTs = undefined;
|
||||||
|
} else {
|
||||||
|
await slack.postMessage(event.channel, "_Stopped_");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.logWarning(`[${event.channel}] Run error`, err instanceof Error ? err.message : String(err));
|
||||||
|
} finally {
|
||||||
|
state.running = false;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Start
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
log.logStartup(workingDir, sandbox.type === "host" ? "host" : `docker:${sandbox.container}`);
|
||||||
|
|
||||||
|
const bot = new SlackBotClass(handler, {
|
||||||
|
appToken: MOM_SLACK_APP_TOKEN,
|
||||||
|
botToken: MOM_SLACK_BOT_TOKEN,
|
||||||
|
workingDir,
|
||||||
|
});
|
||||||
|
|
||||||
bot.start();
|
bot.start();
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue