mom: refactor to use AgentSession for context management

- Export AgentSession, SessionManager, SettingsManager, compaction from coding-agent
- Create MomSessionManager for channel-based context.jsonl storage
- Create MomSettingsManager for mom-specific settings
- Refactor agent.ts to use AgentSession instead of ephemeral Agent
- Split logging: tool results go to context.jsonl, human messages to log.jsonl
- Enable auto-compaction and overflow detection from coding-agent

Part of #115
This commit is contained in:
Mario Zechner 2025-12-11 13:09:01 +01:00
parent de7f71838c
commit 3f6db8e99c
3 changed files with 685 additions and 324 deletions

View file

@ -1,3 +1,27 @@
// Core session management
export {
AgentSession,
type AgentSessionConfig,
type AgentSessionEvent,
type AgentSessionEventListener,
type CompactionResult,
type ModelCycleResult,
type PromptOptions,
type SessionStats,
} from "./core/agent-session.js";
// Compaction
export {
type CutPointResult,
calculateContextTokens,
compact,
DEFAULT_COMPACTION_SETTINGS,
estimateTokens,
findCutPoint,
findTurnStartIndex,
generateSummary,
getLastAssistantUsage,
shouldCompact,
} from "./core/compaction.js";
// Hook system types // Hook system types
export type { export type {
AgentEndEvent, AgentEndEvent,
@ -18,6 +42,32 @@ export type {
TurnEndEvent, TurnEndEvent,
TurnStartEvent, TurnStartEvent,
} from "./core/hooks/index.js"; } from "./core/hooks/index.js";
export { SessionManager } from "./core/session-manager.js"; export { messageTransformer } from "./core/messages.js";
export {
type CompactionEntry,
createSummaryMessage,
getLatestCompactionEntry,
type LoadedSession,
loadSessionFromEntries,
type ModelChangeEntry,
parseSessionEntries,
type SessionEntry,
type SessionHeader,
SessionManager,
type SessionMessageEntry,
SUMMARY_PREFIX,
SUMMARY_SUFFIX,
type ThinkingLevelChangeEntry,
} from "./core/session-manager.js";
export {
type CompactionSettings,
type RetrySettings,
type Settings,
SettingsManager,
} from "./core/settings-manager.js";
// Tools
export { bashTool, codingTools, editTool, readTool, writeTool } from "./core/tools/index.js"; export { bashTool, codingTools, editTool, readTool, writeTool } from "./core/tools/index.js";
// Main entry point
export { main } from "./main.js"; export { main } from "./main.js";

View file

@ -1,15 +1,17 @@
import { Agent, type AgentEvent, ProviderTransport } from "@mariozechner/pi-agent-core"; import { Agent, type AgentEvent, ProviderTransport } from "@mariozechner/pi-agent-core";
import { getModel } from "@mariozechner/pi-ai"; import { getModel } from "@mariozechner/pi-ai";
import { AgentSession, messageTransformer } from "@mariozechner/pi-coding-agent";
import { existsSync, readFileSync } from "fs"; import { existsSync, readFileSync } from "fs";
import { mkdir, writeFile } from "fs/promises"; import { mkdir, writeFile } from "fs/promises";
import { join } from "path"; import { join } from "path";
import { MomSessionManager, MomSettingsManager } from "./context.js";
import * as log from "./log.js"; import * as log from "./log.js";
import { createExecutor, type SandboxConfig } from "./sandbox.js"; import { createExecutor, type SandboxConfig } from "./sandbox.js";
import type { ChannelInfo, SlackContext, UserInfo } from "./slack.js"; import type { ChannelInfo, SlackContext, UserInfo } from "./slack.js";
import type { ChannelStore } from "./store.js"; import type { ChannelStore } from "./store.js";
import { createMomTools, setUploadFunction } from "./tools/index.js"; import { createMomTools, setUploadFunction } from "./tools/index.js";
// Hardcoded model for now // Hardcoded model for now - TODO: make configurable (issue #63)
const model = getModel("anthropic", "claude-sonnet-4-5"); const model = getModel("anthropic", "claude-sonnet-4-5");
/** /**
@ -22,15 +24,13 @@ let tsCounter = 0;
function toSlackTs(): string { function toSlackTs(): string {
const now = Date.now(); const now = Date.now();
if (now === lastTsMs) { if (now === lastTsMs) {
// Same millisecond - increment counter for sub-ms ordering
tsCounter++; tsCounter++;
} else { } else {
// New millisecond - reset counter
lastTsMs = now; lastTsMs = now;
tsCounter = 0; tsCounter = 0;
} }
const seconds = Math.floor(now / 1000); const seconds = Math.floor(now / 1000);
const micros = (now % 1000) * 1000 + tsCounter; // ms to micros + counter const micros = (now % 1000) * 1000 + tsCounter;
return `${seconds}.${micros.toString().padStart(6, "0")}`; return `${seconds}.${micros.toString().padStart(6, "0")}`;
} }
@ -47,95 +47,6 @@ function getAnthropicApiKey(): string {
return key; return key;
} }
interface LogMessage {
date?: string;
ts?: string;
user?: string;
userName?: string;
text?: string;
attachments?: Array<{ local: string }>;
isBot?: boolean;
}
function getRecentMessages(channelDir: string, turnCount: number): string {
const logPath = join(channelDir, "log.jsonl");
if (!existsSync(logPath)) {
return "(no message history yet)";
}
const content = readFileSync(logPath, "utf-8");
const lines = content.trim().split("\n").filter(Boolean);
if (lines.length === 0) {
return "(no message history yet)";
}
// Parse all messages and sort by Slack timestamp
// (attachment downloads can cause out-of-order logging)
const messages: LogMessage[] = [];
for (const line of lines) {
try {
messages.push(JSON.parse(line));
} catch {}
}
messages.sort((a, b) => {
const tsA = parseFloat(a.ts || "0");
const tsB = parseFloat(b.ts || "0");
return tsA - tsB;
});
// Group into "turns" - a turn is either:
// - A single user message (isBot: false)
// - A sequence of consecutive bot messages (isBot: true) coalesced into one turn
// We walk backwards to get the last N turns
const turns: LogMessage[][] = [];
let currentTurn: LogMessage[] = [];
let lastWasBot: boolean | null = null;
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
const isBot = msg.isBot === true;
if (lastWasBot === null) {
// First message
currentTurn.unshift(msg);
lastWasBot = isBot;
} else if (isBot && lastWasBot) {
// Consecutive bot messages - same turn
currentTurn.unshift(msg);
} else {
// Transition - save current turn and start new one
turns.unshift(currentTurn);
currentTurn = [msg];
lastWasBot = isBot;
// Stop if we have enough turns
if (turns.length >= turnCount) {
break;
}
}
}
// Don't forget the last turn we were building
if (currentTurn.length > 0 && turns.length < turnCount) {
turns.unshift(currentTurn);
}
// Flatten turns back to messages and format as TSV
const formatted: string[] = [];
for (const turn of turns) {
for (const msg of turn) {
const date = (msg.date || "").substring(0, 19);
const user = msg.userName || msg.user || "";
const text = msg.text || "";
const attachments = (msg.attachments || []).map((a) => a.local).join(",");
formatted.push(`${date}\t${user}\t${text}\t${attachments}`);
}
}
return formatted.join("\n");
}
function getMemory(channelDir: string): string { function getMemory(channelDir: string): string {
const parts: string[] = []; const parts: string[] = [];
@ -203,8 +114,9 @@ function buildSystemPrompt(
return `You are mom, a Slack bot assistant. Be concise. No emojis. return `You are mom, a Slack bot assistant. Be concise. No emojis.
## Context ## Context
- For current date/time, call date via the Bash tool. - For current date/time, use: date
- You receive the last 50 conversation turns. If you need older context, search log.jsonl. - You have access to previous conversation context including tool results from prior turns.
- For older history beyond your context, search log.jsonl (contains user messages and your final responses, but not tool results).
## Slack Formatting (mrkdwn, NOT Markdown) ## Slack Formatting (mrkdwn, NOT Markdown)
Bold: *text*, Italic: _text_, Code: \`code\`, Block: \`\`\`code\`\`\`, Links: <url|text> Bold: *text*, Italic: _text_, Code: \`code\`, Block: \`\`\`code\`\`\`, Links: <url|text>
@ -226,7 +138,7 @@ ${workspacePath}/
skills/ # Global CLI tools you create skills/ # Global CLI tools you create
${channelId}/ # This channel ${channelId}/ # This channel
MEMORY.md # Channel-specific memory MEMORY.md # Channel-specific memory
log.jsonl # Full message history log.jsonl # Message history (no tool results)
attachments/ # User-shared files attachments/ # User-shared files
scratch/ # Your working directory scratch/ # Your working directory
skills/ # Channel-specific tools skills/ # Channel-specific tools
@ -255,30 +167,20 @@ Maintain ${workspacePath}/SYSTEM.md to log all environment modifications:
Update this file whenever you modify the environment. On fresh container, read it first to restore your setup. Update this file whenever you modify the environment. On fresh container, read it first to restore your setup.
## Log Queries (CRITICAL: limit output to avoid context overflow) ## Log Queries (for older history)
Format: \`{"date":"...","ts":"...","user":"...","userName":"...","text":"...","isBot":false}\` Format: \`{"date":"...","ts":"...","user":"...","userName":"...","text":"...","isBot":false}\`
The log contains user messages AND your tool calls/results. Filter appropriately. The log contains user messages and your final responses (not tool calls/results).
${isDocker ? "Install jq: apk add jq" : ""} ${isDocker ? "Install jq: apk add jq" : ""}
**Conversation only (excludes tool calls/results) - use for summaries:**
\`\`\`bash \`\`\`bash
# Recent conversation (no [Tool] or [Tool Result] lines) # Recent messages
grep -v '"text":"\\[Tool' log.jsonl | tail -30 | jq -c '{date: .date[0:19], user: (.userName // .user), text}' tail -30 log.jsonl | jq -c '{date: .date[0:19], user: (.userName // .user), text}'
# Yesterday's conversation # Search for specific topic
grep '"date":"2025-11-26' log.jsonl | grep -v '"text":"\\[Tool' | jq -c '{date: .date[0:19], user: (.userName // .user), text}' grep -i "topic" log.jsonl | jq -c '{date: .date[0:19], user: (.userName // .user), text}'
# Specific user's messages # Messages from specific user
grep '"userName":"mario"' log.jsonl | grep -v '"text":"\\[Tool' | tail -20 | jq -c '{date: .date[0:19], text}' grep '"userName":"mario"' log.jsonl | tail -20 | jq -c '{date: .date[0:19], text}'
\`\`\`
**Full details (includes tool calls) - use when you need technical context:**
\`\`\`bash
# Raw recent entries
tail -20 log.jsonl | jq -c '{date: .date[0:19], user: (.userName // .user), text}'
# Count all messages
wc -l log.jsonl
\`\`\` \`\`\`
## Tools ## Tools
@ -298,12 +200,10 @@ function truncate(text: string, maxLen: number): string {
} }
function extractToolResultText(result: unknown): string { function extractToolResultText(result: unknown): string {
// If it's already a string, return it
if (typeof result === "string") { if (typeof result === "string") {
return result; return result;
} }
// If it's an object with content array (tool result format)
if ( if (
result && result &&
typeof result === "object" && typeof result === "object" &&
@ -322,7 +222,6 @@ function extractToolResultText(result: unknown): string {
} }
} }
// Fallback to JSON
return JSON.stringify(result); return JSON.stringify(result);
} }
@ -330,10 +229,8 @@ function formatToolArgsForSlack(_toolName: string, args: Record<string, unknown>
const lines: string[] = []; const lines: string[] = [];
for (const [key, value] of Object.entries(args)) { for (const [key, value] of Object.entries(args)) {
// Skip the label - it's already shown
if (key === "label") continue; if (key === "label") continue;
// For read tool, format path with offset/limit
if (key === "path" && typeof value === "string") { if (key === "path" && typeof value === "string") {
const offset = args.offset as number | undefined; const offset = args.offset as number | undefined;
const limit = args.limit as number | undefined; const limit = args.limit as number | undefined;
@ -345,10 +242,8 @@ function formatToolArgsForSlack(_toolName: string, args: Record<string, unknown>
continue; continue;
} }
// Skip offset/limit since we already handled them
if (key === "offset" || key === "limit") continue; if (key === "offset" || key === "limit") continue;
// For other values, format them
if (typeof value === "string") { if (typeof value === "string") {
lines.push(value); lines.push(value);
} else { } else {
@ -359,8 +254,11 @@ function formatToolArgsForSlack(_toolName: string, args: Record<string, unknown>
return lines.join("\n"); return lines.join("\n");
} }
// Cache for AgentSession per channel
const channelSessions = new Map<string, AgentSession>();
export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner { export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
let agent: Agent | null = null; let currentSession: AgentSession | null = null;
const executor = createExecutor(sandboxConfig); const executor = createExecutor(sandboxConfig);
return { return {
@ -370,7 +268,6 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
const channelId = ctx.message.channel; const channelId = ctx.message.channel;
const workspacePath = executor.getWorkspacePath(channelDir.replace(`/${channelId}`, "")); const workspacePath = executor.getWorkspacePath(channelDir.replace(`/${channelId}`, ""));
const recentMessages = getRecentMessages(channelDir, 50);
const memory = getMemory(channelDir); const memory = getMemory(channelDir);
const systemPrompt = buildSystemPrompt( const systemPrompt = buildSystemPrompt(
@ -383,13 +280,10 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
); );
// Debug: log context sizes // Debug: log context sizes
log.logInfo( log.logInfo(`Context sizes - system: ${systemPrompt.length} chars, memory: ${memory.length} chars`);
`Context sizes - system: ${systemPrompt.length} chars, messages: ${recentMessages.length} chars, memory: ${memory.length} chars`,
);
log.logInfo(`Channels: ${ctx.channels.length}, Users: ${ctx.users.length}`); log.logInfo(`Channels: ${ctx.channels.length}, Users: ${ctx.users.length}`);
// Set up file upload function for the attach tool // Set up file upload function for the attach tool
// For Docker, we need to translate paths back to host
setUploadFunction(async (filePath: string, title?: string) => { setUploadFunction(async (filePath: string, title?: string) => {
const hostPath = translateToHostPath(filePath, channelDir, workspacePath, channelId); const hostPath = translateToHostPath(filePath, channelDir, workspacePath, channelId);
await ctx.uploadFile(hostPath, title); await ctx.uploadFile(hostPath, title);
@ -398,18 +292,49 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
// Create tools with executor // Create tools with executor
const tools = createMomTools(executor); const tools = createMomTools(executor);
// Create ephemeral agent // Get or create AgentSession for this channel
agent = new Agent({ let session = channelSessions.get(channelId);
initialState: {
systemPrompt, if (!session) {
model, // Create session manager and settings manager
thinkingLevel: "off", const sessionManager = new MomSessionManager(channelDir);
tools, const settingsManager = new MomSettingsManager(join(channelDir, ".."));
},
transport: new ProviderTransport({ // Create agent with proper message transformer for compaction
getApiKey: async () => getAnthropicApiKey(), const agent = new Agent({
}), initialState: {
}); systemPrompt,
model,
thinkingLevel: "off",
tools,
},
messageTransformer,
transport: new ProviderTransport({
getApiKey: async () => getAnthropicApiKey(),
}),
});
// Load existing messages from session
const loadedSession = sessionManager.loadSession();
if (loadedSession.messages.length > 0) {
agent.replaceMessages(loadedSession.messages);
log.logInfo(`Loaded ${loadedSession.messages.length} messages from context.jsonl`);
}
// Create AgentSession wrapper
session = new AgentSession({
agent,
sessionManager: sessionManager as any, // Type compatibility
settingsManager: settingsManager as any, // Type compatibility
});
channelSessions.set(channelId, session);
} else {
// Update system prompt for existing session (memory may have changed)
session.agent.setSystemPrompt(systemPrompt);
}
currentSession = session;
// Create logging context // Create logging context
const logCtx = { const logCtx = {
@ -439,7 +364,7 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
// Track stop reason // Track stop reason
let stopReason = "stop"; let stopReason = "stop";
// Slack message limit is 40,000 characters - split into multiple messages if needed // Slack message limit is 40,000 characters
const SLACK_MAX_LENGTH = 40000; const SLACK_MAX_LENGTH = 40000;
const splitForSlack = (text: string): string[] => { const splitForSlack = (text: string): string[] => {
if (text.length <= SLACK_MAX_LENGTH) return [text]; if (text.length <= SLACK_MAX_LENGTH) return [text];
@ -457,7 +382,6 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
}; };
// Promise queue to ensure ctx.respond/respondInThread calls execute in order // Promise queue to ensure ctx.respond/respondInThread calls execute in order
// Handles errors gracefully by posting to thread instead of crashing
const queue = { const queue = {
chain: Promise.resolve(), chain: Promise.resolve(),
enqueue(fn: () => Promise<void>, errorContext: string): void { enqueue(fn: () => Promise<void>, errorContext: string): void {
@ -467,21 +391,19 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
} catch (err) { } catch (err) {
const errMsg = err instanceof Error ? err.message : String(err); const errMsg = err instanceof Error ? err.message : String(err);
log.logWarning(`Slack API error (${errorContext})`, errMsg); log.logWarning(`Slack API error (${errorContext})`, errMsg);
// Try to post error to thread, but don't crash if that fails too
try { try {
await ctx.respondInThread(`_Error: ${errMsg}_`); await ctx.respondInThread(`_Error: ${errMsg}_`);
} catch { } catch {
// Ignore - we tried our best // Ignore
} }
} }
}); });
}, },
// Enqueue a message that may need splitting enqueueMessage(text: string, target: "main" | "thread", errorContext: string, doLog = true): void {
enqueueMessage(text: string, target: "main" | "thread", errorContext: string, log = true): void {
const parts = splitForSlack(text); const parts = splitForSlack(text);
for (const part of parts) { for (const part of parts) {
this.enqueue( this.enqueue(
() => (target === "main" ? ctx.respond(part, log) : ctx.respondInThread(part)), () => (target === "main" ? ctx.respond(part, doLog) : ctx.respondInThread(part)),
errorContext, errorContext,
); );
} }
@ -491,208 +413,209 @@ export function createAgentRunner(sandboxConfig: SandboxConfig): AgentRunner {
}, },
}; };
// Subscribe to events // Subscribe to session events
agent.subscribe(async (event: AgentEvent) => { const unsubscribe = session.subscribe(async (event) => {
switch (event.type) { // Handle core agent events
case "tool_execution_start": { if (event.type === "tool_execution_start") {
const args = event.args as { label?: string }; const agentEvent = event as AgentEvent & { type: "tool_execution_start" };
const label = args.label || event.toolName; const args = agentEvent.args as { label?: string };
const label = args.label || agentEvent.toolName;
// Store args to pair with result later pendingTools.set(agentEvent.toolCallId, {
pendingTools.set(event.toolCallId, { toolName: agentEvent.toolName,
toolName: event.toolName, args: agentEvent.args,
args: event.args, startTime: Date.now(),
startTime: Date.now(), });
});
// Log to console log.logToolStart(logCtx, agentEvent.toolName, label, agentEvent.args as Record<string, unknown>);
log.logToolStart(logCtx, event.toolName, label, event.args as Record<string, unknown>);
// Log to jsonl // NOTE: Tool results are NOT logged to log.jsonl anymore
await store.logMessage(ctx.message.channel, { // They are stored in context.jsonl via AgentSession
date: new Date().toISOString(),
ts: toSlackTs(),
user: "bot",
text: `[Tool] ${event.toolName}: ${JSON.stringify(event.args)}`,
attachments: [],
isBot: true,
});
// Show label in main message only queue.enqueue(() => ctx.respond(`_→ ${label}_`, false), "tool label");
queue.enqueue(() => ctx.respond(`_→ ${label}_`, false), "tool label"); } else if (event.type === "tool_execution_end") {
break; const agentEvent = event as AgentEvent & { type: "tool_execution_end" };
const resultStr = extractToolResultText(agentEvent.result);
const pending = pendingTools.get(agentEvent.toolCallId);
pendingTools.delete(agentEvent.toolCallId);
const durationMs = pending ? Date.now() - pending.startTime : 0;
if (agentEvent.isError) {
log.logToolError(logCtx, agentEvent.toolName, durationMs, resultStr);
} else {
log.logToolSuccess(logCtx, agentEvent.toolName, durationMs, resultStr);
} }
case "tool_execution_end": { // Post args + result to thread (for debugging)
const resultStr = extractToolResultText(event.result); const label = pending?.args ? (pending.args as { label?: string }).label : undefined;
const pending = pendingTools.get(event.toolCallId); const argsFormatted = pending
pendingTools.delete(event.toolCallId); ? formatToolArgsForSlack(agentEvent.toolName, pending.args as Record<string, unknown>)
: "(args not found)";
const duration = (durationMs / 1000).toFixed(1);
let threadMessage = `*${agentEvent.isError ? "✗" : "✓"} ${agentEvent.toolName}*`;
if (label) {
threadMessage += `: ${label}`;
}
threadMessage += ` (${duration}s)\n`;
const durationMs = pending ? Date.now() - pending.startTime : 0; if (argsFormatted) {
threadMessage += "```\n" + argsFormatted + "\n```\n";
// Log to console
if (event.isError) {
log.logToolError(logCtx, event.toolName, durationMs, resultStr);
} else {
log.logToolSuccess(logCtx, event.toolName, durationMs, resultStr);
}
// Log to jsonl
await store.logMessage(ctx.message.channel, {
date: new Date().toISOString(),
ts: toSlackTs(),
user: "bot",
text: `[Tool Result] ${event.toolName}: ${event.isError ? "ERROR: " : ""}${resultStr}`,
attachments: [],
isBot: true,
});
// Post args + result together in thread
const label = pending?.args ? (pending.args as { label?: string }).label : undefined;
const argsFormatted = pending
? formatToolArgsForSlack(event.toolName, pending.args as Record<string, unknown>)
: "(args not found)";
const duration = (durationMs / 1000).toFixed(1);
let threadMessage = `*${event.isError ? "✗" : "✓"} ${event.toolName}*`;
if (label) {
threadMessage += `: ${label}`;
}
threadMessage += ` (${duration}s)\n`;
if (argsFormatted) {
threadMessage += "```\n" + argsFormatted + "\n```\n";
}
threadMessage += "*Result:*\n```\n" + resultStr + "\n```";
queue.enqueueMessage(threadMessage, "thread", "tool result thread", false);
// Show brief error in main message if failed
if (event.isError) {
queue.enqueue(() => ctx.respond(`_Error: ${truncate(resultStr, 200)}_`, false), "tool error");
}
break;
} }
case "message_update": { threadMessage += "*Result:*\n```\n" + resultStr + "\n```";
// No longer stream to console - just track that we're streaming
break; queue.enqueueMessage(threadMessage, "thread", "tool result thread", false);
if (agentEvent.isError) {
queue.enqueue(() => ctx.respond(`_Error: ${truncate(resultStr, 200)}_`, false), "tool error");
} }
} else if (event.type === "message_start") {
const agentEvent = event as AgentEvent & { type: "message_start" };
if (agentEvent.message.role === "assistant") {
log.logResponseStart(logCtx);
}
} else if (event.type === "message_end") {
const agentEvent = event as AgentEvent & { type: "message_end" };
if (agentEvent.message.role === "assistant") {
const assistantMsg = agentEvent.message as any;
case "message_start": if (assistantMsg.stopReason) {
if (event.message.role === "assistant") { stopReason = assistantMsg.stopReason;
log.logResponseStart(logCtx);
} }
break;
case "message_end": if (assistantMsg.usage) {
if (event.message.role === "assistant") { totalUsage.input += assistantMsg.usage.input;
const assistantMsg = event.message as any; // AssistantMessage type totalUsage.output += assistantMsg.usage.output;
totalUsage.cacheRead += assistantMsg.usage.cacheRead;
totalUsage.cacheWrite += assistantMsg.usage.cacheWrite;
totalUsage.cost.input += assistantMsg.usage.cost.input;
totalUsage.cost.output += assistantMsg.usage.cost.output;
totalUsage.cost.cacheRead += assistantMsg.usage.cost.cacheRead;
totalUsage.cost.cacheWrite += assistantMsg.usage.cost.cacheWrite;
totalUsage.cost.total += assistantMsg.usage.cost.total;
}
// Track stop reason const content = agentEvent.message.content;
if (assistantMsg.stopReason) { const thinkingParts: string[] = [];
stopReason = assistantMsg.stopReason; const textParts: string[] = [];
} for (const part of content) {
if (part.type === "thinking") {
// Accumulate usage thinkingParts.push((part as any).thinking);
if (assistantMsg.usage) { } else if (part.type === "text") {
totalUsage.input += assistantMsg.usage.input; textParts.push((part as any).text);
totalUsage.output += assistantMsg.usage.output;
totalUsage.cacheRead += assistantMsg.usage.cacheRead;
totalUsage.cacheWrite += assistantMsg.usage.cacheWrite;
totalUsage.cost.input += assistantMsg.usage.cost.input;
totalUsage.cost.output += assistantMsg.usage.cost.output;
totalUsage.cost.cacheRead += assistantMsg.usage.cost.cacheRead;
totalUsage.cost.cacheWrite += assistantMsg.usage.cost.cacheWrite;
totalUsage.cost.total += assistantMsg.usage.cost.total;
}
// Extract thinking and text from assistant message
const content = event.message.content;
const thinkingParts: string[] = [];
const textParts: string[] = [];
for (const part of content) {
if (part.type === "thinking") {
thinkingParts.push(part.thinking);
} else if (part.type === "text") {
textParts.push(part.text);
}
}
const text = textParts.join("\n");
// Post thinking to main message and thread
for (const thinking of thinkingParts) {
log.logThinking(logCtx, thinking);
queue.enqueueMessage(`_${thinking}_`, "main", "thinking main");
queue.enqueueMessage(`_${thinking}_`, "thread", "thinking thread", false);
}
// Post text to main message and thread
if (text.trim()) {
log.logResponse(logCtx, text);
queue.enqueueMessage(text, "main", "response main");
queue.enqueueMessage(text, "thread", "response thread", false);
} }
} }
break;
const text = textParts.join("\n");
for (const thinking of thinkingParts) {
log.logThinking(logCtx, thinking);
queue.enqueueMessage(`_${thinking}_`, "main", "thinking main");
queue.enqueueMessage(`_${thinking}_`, "thread", "thinking thread", false);
}
if (text.trim()) {
log.logResponse(logCtx, text);
queue.enqueueMessage(text, "main", "response main");
queue.enqueueMessage(text, "thread", "response thread", false);
}
}
} else if (event.type === "auto_compaction_start") {
log.logInfo(`Auto-compaction started (reason: ${(event as any).reason})`);
queue.enqueue(() => ctx.respond("_Compacting context..._", false), "compaction start");
} else if (event.type === "auto_compaction_end") {
const compEvent = event as any;
if (compEvent.result) {
log.logInfo(`Auto-compaction complete: ${compEvent.result.tokensBefore} tokens compacted`);
} else if (compEvent.aborted) {
log.logInfo("Auto-compaction aborted");
}
} else if (event.type === "auto_retry_start") {
const retryEvent = event as any;
log.logWarning(`Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})`, retryEvent.errorMessage);
queue.enqueue(
() => ctx.respond(`_Retrying (${retryEvent.attempt}/${retryEvent.maxAttempts})..._`, false),
"retry",
);
} }
}); });
// Run the agent with user's message try {
// Prepend recent messages to the user prompt (not system prompt) for better caching // Build user message from Slack context
// The current message is already the last entry in recentMessages const userMessage = ctx.message.text;
const userPrompt =
`Conversation history (last 50 turns). Respond to the last message.\n` +
`Format: date TAB user TAB text TAB attachments\n\n` +
recentMessages;
// Debug: write full context to file
const toolDefs = tools.map((t) => ({ name: t.name, description: t.description, parameters: t.parameters }));
const debugPrompt =
`=== SYSTEM PROMPT (${systemPrompt.length} chars) ===\n\n${systemPrompt}\n\n` +
`=== TOOL DEFINITIONS (${JSON.stringify(toolDefs).length} chars) ===\n\n${JSON.stringify(toolDefs, null, 2)}\n\n` +
`=== USER PROMPT (${userPrompt.length} chars) ===\n\n${userPrompt}`;
await writeFile(join(channelDir, "last_prompt.txt"), debugPrompt, "utf-8");
await agent.prompt(userPrompt); // Debug: write context to file
const debugPrompt =
`=== SYSTEM PROMPT (${systemPrompt.length} chars) ===\n\n${systemPrompt}\n\n` +
`=== USER MESSAGE ===\n\n${userMessage}\n\n` +
`=== EXISTING CONTEXT ===\n\n${session.messages.length} messages in context`;
await writeFile(join(channelDir, "last_prompt.txt"), debugPrompt, "utf-8");
// Wait for all queued respond calls to complete // Log user message to log.jsonl (human-readable history)
await queue.flush(); await store.logMessage(ctx.message.channel, {
date: new Date().toISOString(),
ts: toSlackTs(),
user: ctx.message.user,
userName: ctx.message.userName,
text: userMessage,
attachments: ctx.message.attachments || [],
isBot: false,
});
// Get final assistant message text from agent state and replace main message // Send prompt to agent session
const messages = agent.state.messages; await session.prompt(userMessage);
const lastAssistant = messages.filter((m) => m.role === "assistant").pop();
const finalText =
lastAssistant?.content
.filter((c): c is { type: "text"; text: string } => c.type === "text")
.map((c) => c.text)
.join("\n") || "";
if (finalText.trim()) {
try {
// For the main message, truncate if too long (full text is in thread)
const mainText =
finalText.length > SLACK_MAX_LENGTH
? finalText.substring(0, SLACK_MAX_LENGTH - 50) + "\n\n_(see thread for full response)_"
: finalText;
await ctx.replaceMessage(mainText);
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.logWarning("Failed to replace message with final text", errMsg);
}
}
// Log usage summary if there was any usage // Wait for all queued Slack messages
if (totalUsage.cost.total > 0) {
const summary = log.logUsageSummary(logCtx, totalUsage);
queue.enqueue(() => ctx.respondInThread(summary), "usage summary");
await queue.flush(); await queue.flush();
}
return { stopReason }; // Get final assistant text and update main message
const messages = session.messages;
const lastAssistant = messages.filter((m) => m.role === "assistant").pop();
const finalText =
lastAssistant?.content
.filter((c): c is { type: "text"; text: string } => c.type === "text")
.map((c) => c.text)
.join("\n") || "";
if (finalText.trim()) {
// Log final response to log.jsonl (human-readable history)
await store.logMessage(ctx.message.channel, {
date: new Date().toISOString(),
ts: toSlackTs(),
user: "bot",
text: finalText,
attachments: [],
isBot: true,
});
try {
const mainText =
finalText.length > SLACK_MAX_LENGTH
? finalText.substring(0, SLACK_MAX_LENGTH - 50) + "\n\n_(see thread for full response)_"
: finalText;
await ctx.replaceMessage(mainText);
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log.logWarning("Failed to replace message with final text", errMsg);
}
}
// Log usage summary
if (totalUsage.cost.total > 0) {
const summary = log.logUsageSummary(logCtx, totalUsage);
queue.enqueue(() => ctx.respondInThread(summary), "usage summary");
await queue.flush();
}
return { stopReason };
} finally {
unsubscribe();
}
}, },
abort(): void { abort(): void {
agent?.abort(); currentSession?.abort();
}, },
}; };
} }
@ -707,16 +630,13 @@ function translateToHostPath(
channelId: string, channelId: string,
): string { ): string {
if (workspacePath === "/workspace") { if (workspacePath === "/workspace") {
// Docker mode - translate /workspace/channelId/... to host path
const prefix = `/workspace/${channelId}/`; const prefix = `/workspace/${channelId}/`;
if (containerPath.startsWith(prefix)) { if (containerPath.startsWith(prefix)) {
return join(channelDir, containerPath.slice(prefix.length)); return join(channelDir, containerPath.slice(prefix.length));
} }
// Maybe it's just /workspace/...
if (containerPath.startsWith("/workspace/")) { if (containerPath.startsWith("/workspace/")) {
return join(channelDir, "..", containerPath.slice("/workspace/".length)); return join(channelDir, "..", containerPath.slice("/workspace/".length));
} }
} }
// Host mode or already a host path
return containerPath; return containerPath;
} }

391
packages/mom/src/context.ts Normal file
View file

@ -0,0 +1,391 @@
/**
* Context management for mom.
*
* Mom uses two files per channel:
* - context.jsonl: Structured API messages for LLM context (same format as coding-agent sessions)
* - log.jsonl: Human-readable channel history for grep (no tool results)
*
* This module provides:
* - MomSessionManager: Adapts coding-agent's SessionManager for channel-based storage
* - MomSettingsManager: Simple settings for mom (compaction, retry, model preferences)
*/
import type { AgentState, AppMessage } from "@mariozechner/pi-agent-core";
import {
type CompactionEntry,
type LoadedSession,
loadSessionFromEntries,
type ModelChangeEntry,
type SessionEntry,
type SessionHeader,
type SessionMessageEntry,
type ThinkingLevelChangeEntry,
} from "@mariozechner/pi-coding-agent";
import { randomBytes } from "crypto";
import { appendFileSync, existsSync, mkdirSync, readFileSync, writeFileSync } from "fs";
import { dirname, join } from "path";
function uuidv4(): string {
const bytes = randomBytes(16);
bytes[6] = (bytes[6] & 0x0f) | 0x40;
bytes[8] = (bytes[8] & 0x3f) | 0x80;
const hex = bytes.toString("hex");
return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20, 32)}`;
}
// ============================================================================
// MomSessionManager - Channel-based session management
// ============================================================================
/**
* Session manager for mom, storing context per Slack channel.
*
* Unlike coding-agent which creates timestamped session files, mom uses
* a single context.jsonl per channel that persists across all @mentions.
*/
export class MomSessionManager {
private sessionId: string;
private contextFile: string;
private channelDir: string;
private sessionInitialized: boolean = false;
private inMemoryEntries: SessionEntry[] = [];
private pendingEntries: SessionEntry[] = [];
constructor(channelDir: string) {
this.channelDir = channelDir;
this.contextFile = join(channelDir, "context.jsonl");
// Ensure channel directory exists
if (!existsSync(channelDir)) {
mkdirSync(channelDir, { recursive: true });
}
// Load existing session or create new
if (existsSync(this.contextFile)) {
this.inMemoryEntries = this.loadEntriesFromFile();
this.sessionId = this.extractSessionId() || uuidv4();
this.sessionInitialized = this.inMemoryEntries.length > 0;
} else {
this.sessionId = uuidv4();
}
}
private extractSessionId(): string | null {
for (const entry of this.inMemoryEntries) {
if (entry.type === "session") {
return entry.id;
}
}
return null;
}
private loadEntriesFromFile(): SessionEntry[] {
if (!existsSync(this.contextFile)) return [];
const content = readFileSync(this.contextFile, "utf8");
const entries: SessionEntry[] = [];
const lines = content.trim().split("\n");
for (const line of lines) {
if (!line.trim()) continue;
try {
const entry = JSON.parse(line) as SessionEntry;
entries.push(entry);
} catch {
// Skip malformed lines
}
}
return entries;
}
/** Initialize session with header if not already done */
startSession(state: AgentState): void {
if (this.sessionInitialized) return;
this.sessionInitialized = true;
const entry: SessionHeader = {
type: "session",
id: this.sessionId,
timestamp: new Date().toISOString(),
cwd: this.channelDir,
provider: state.model?.provider || "unknown",
modelId: state.model?.id || "unknown",
thinkingLevel: state.thinkingLevel,
};
this.inMemoryEntries.push(entry);
for (const pending of this.pendingEntries) {
this.inMemoryEntries.push(pending);
}
this.pendingEntries = [];
// Write to file
appendFileSync(this.contextFile, JSON.stringify(entry) + "\n");
for (const memEntry of this.inMemoryEntries.slice(1)) {
appendFileSync(this.contextFile, JSON.stringify(memEntry) + "\n");
}
}
saveMessage(message: AppMessage): void {
const entry: SessionMessageEntry = {
type: "message",
timestamp: new Date().toISOString(),
message,
};
if (!this.sessionInitialized) {
this.pendingEntries.push(entry);
} else {
this.inMemoryEntries.push(entry);
appendFileSync(this.contextFile, JSON.stringify(entry) + "\n");
}
}
saveThinkingLevelChange(thinkingLevel: string): void {
const entry: ThinkingLevelChangeEntry = {
type: "thinking_level_change",
timestamp: new Date().toISOString(),
thinkingLevel,
};
if (!this.sessionInitialized) {
this.pendingEntries.push(entry);
} else {
this.inMemoryEntries.push(entry);
appendFileSync(this.contextFile, JSON.stringify(entry) + "\n");
}
}
saveModelChange(provider: string, modelId: string): void {
const entry: ModelChangeEntry = {
type: "model_change",
timestamp: new Date().toISOString(),
provider,
modelId,
};
if (!this.sessionInitialized) {
this.pendingEntries.push(entry);
} else {
this.inMemoryEntries.push(entry);
appendFileSync(this.contextFile, JSON.stringify(entry) + "\n");
}
}
saveCompaction(entry: CompactionEntry): void {
this.inMemoryEntries.push(entry);
appendFileSync(this.contextFile, JSON.stringify(entry) + "\n");
}
/** Load session with compaction support */
loadSession(): LoadedSession {
const entries = this.loadEntries();
return loadSessionFromEntries(entries);
}
loadEntries(): SessionEntry[] {
// Re-read from file to get latest state
if (existsSync(this.contextFile)) {
return this.loadEntriesFromFile();
}
return [...this.inMemoryEntries];
}
getSessionId(): string {
return this.sessionId;
}
getSessionFile(): string {
return this.contextFile;
}
/** Check if session should be initialized */
shouldInitializeSession(messages: AppMessage[]): boolean {
if (this.sessionInitialized) return false;
const userMessages = messages.filter((m) => m.role === "user");
const assistantMessages = messages.filter((m) => m.role === "assistant");
return userMessages.length >= 1 && assistantMessages.length >= 1;
}
/** Reset session (clears context.jsonl) */
reset(): void {
this.pendingEntries = [];
this.inMemoryEntries = [];
this.sessionInitialized = false;
this.sessionId = uuidv4();
// Truncate the context file
if (existsSync(this.contextFile)) {
writeFileSync(this.contextFile, "");
}
}
// Compatibility methods for AgentSession
isEnabled(): boolean {
return true;
}
setSessionFile(_path: string): void {
// No-op for mom - we always use the channel's context.jsonl
}
loadModel(): { provider: string; modelId: string } | null {
return this.loadSession().model;
}
loadThinkingLevel(): string {
return this.loadSession().thinkingLevel;
}
/** Not used by mom but required by AgentSession interface */
createBranchedSessionFromEntries(_entries: SessionEntry[], _branchBeforeIndex: number): string | null {
return null; // Mom doesn't support branching
}
}
// ============================================================================
// MomSettingsManager - Simple settings for mom
// ============================================================================
export interface MomCompactionSettings {
enabled: boolean;
reserveTokens: number;
keepRecentTokens: number;
}
export interface MomRetrySettings {
enabled: boolean;
maxRetries: number;
baseDelayMs: number;
}
export interface MomSettings {
defaultProvider?: string;
defaultModel?: string;
defaultThinkingLevel?: "off" | "minimal" | "low" | "medium" | "high";
compaction?: Partial<MomCompactionSettings>;
retry?: Partial<MomRetrySettings>;
}
const DEFAULT_COMPACTION: MomCompactionSettings = {
enabled: true,
reserveTokens: 16384,
keepRecentTokens: 20000,
};
const DEFAULT_RETRY: MomRetrySettings = {
enabled: true,
maxRetries: 3,
baseDelayMs: 2000,
};
/**
* Settings manager for mom.
* Stores settings in the workspace root directory.
*/
export class MomSettingsManager {
private settingsPath: string;
private settings: MomSettings;
constructor(workspaceDir: string) {
this.settingsPath = join(workspaceDir, "settings.json");
this.settings = this.load();
}
private load(): MomSettings {
if (!existsSync(this.settingsPath)) {
return {};
}
try {
const content = readFileSync(this.settingsPath, "utf-8");
return JSON.parse(content);
} catch {
return {};
}
}
private save(): void {
try {
const dir = dirname(this.settingsPath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
writeFileSync(this.settingsPath, JSON.stringify(this.settings, null, 2), "utf-8");
} catch (error) {
console.error(`Warning: Could not save settings file: ${error}`);
}
}
getCompactionSettings(): MomCompactionSettings {
return {
...DEFAULT_COMPACTION,
...this.settings.compaction,
};
}
getCompactionEnabled(): boolean {
return this.settings.compaction?.enabled ?? DEFAULT_COMPACTION.enabled;
}
setCompactionEnabled(enabled: boolean): void {
this.settings.compaction = { ...this.settings.compaction, enabled };
this.save();
}
getRetrySettings(): MomRetrySettings {
return {
...DEFAULT_RETRY,
...this.settings.retry,
};
}
getRetryEnabled(): boolean {
return this.settings.retry?.enabled ?? DEFAULT_RETRY.enabled;
}
setRetryEnabled(enabled: boolean): void {
this.settings.retry = { ...this.settings.retry, enabled };
this.save();
}
getDefaultModel(): string | undefined {
return this.settings.defaultModel;
}
getDefaultProvider(): string | undefined {
return this.settings.defaultProvider;
}
setDefaultModelAndProvider(provider: string, modelId: string): void {
this.settings.defaultProvider = provider;
this.settings.defaultModel = modelId;
this.save();
}
getDefaultThinkingLevel(): string {
return this.settings.defaultThinkingLevel || "off";
}
setDefaultThinkingLevel(level: string): void {
this.settings.defaultThinkingLevel = level as MomSettings["defaultThinkingLevel"];
this.save();
}
// Compatibility methods for AgentSession
getQueueMode(): "all" | "one-at-a-time" {
return "one-at-a-time"; // Mom processes one message at a time
}
setQueueMode(_mode: "all" | "one-at-a-time"): void {
// No-op for mom
}
getHookPaths(): string[] {
return []; // Mom doesn't use hooks
}
getHookTimeout(): number {
return 30000;
}
}