mom: add events system for scheduled wake-ups

- Three event types: immediate, one-shot, periodic (cron)
- Events are JSON files in workspace/events/
- EventsWatcher with fs.watch, 100ms debounce
- Queue integration via SlackBot.enqueueEvent() (max 5)
- Fix setTyping race condition causing duplicate messages
- System prompt documents events for mom
- Design doc in docs/events.md
- Add croner dependency for cron scheduling
This commit is contained in:
Mario Zechner 2025-12-12 22:45:34 +01:00
parent 03c404c15f
commit d6809328da
9 changed files with 847 additions and 7 deletions

View file

@ -160,6 +160,63 @@ Store in \`${workspacePath}/skills/<name>/\` or \`${channelPath}/skills/<name>/\
Each skill needs a \`SKILL.md\` documenting usage. Read it before using a skill.
List skills in global memory so you remember them.
## Events
You can schedule events that wake you up at specific times or when external things happen. Events are JSON files in \`${workspacePath}/events/\`.
### Event Types
**Immediate** - Triggers as soon as harness sees the file. Use in scripts/webhooks to signal external events.
\`\`\`json
{"type": "immediate", "channelId": "${channelId}", "text": "New GitHub issue opened"}
\`\`\`
**One-shot** - Triggers once at a specific time. Use for reminders.
\`\`\`json
{"type": "one-shot", "channelId": "${channelId}", "text": "Remind Mario about dentist", "at": "2025-12-15T09:00:00+01:00"}
\`\`\`
**Periodic** - Triggers on a cron schedule. Use for recurring tasks.
\`\`\`json
{"type": "periodic", "channelId": "${channelId}", "text": "Check inbox and summarize", "schedule": "0 9 * * 1-5", "timezone": "${Intl.DateTimeFormat().resolvedOptions().timeZone}"}
\`\`\`
### Cron Format
\`minute hour day-of-month month day-of-week\`
- \`0 9 * * *\` = daily at 9:00
- \`0 9 * * 1-5\` = weekdays at 9:00
- \`30 14 * * 1\` = Mondays at 14:30
- \`0 0 1 * *\` = first of each month at midnight
### Timezones
All \`at\` timestamps must include offset (e.g., \`+01:00\`). Periodic events use IANA timezone names. The harness runs in ${Intl.DateTimeFormat().resolvedOptions().timeZone}. When users mention times without timezone, assume ${Intl.DateTimeFormat().resolvedOptions().timeZone}.
### Creating Events
Use unique filenames to avoid overwriting existing events. Include a timestamp or random suffix:
\`\`\`bash
cat > ${workspacePath}/events/dentist-reminder-$(date +%s).json << 'EOF'
{"type": "one-shot", "channelId": "${channelId}", "text": "Dentist tomorrow", "at": "2025-12-14T09:00:00+01:00"}
EOF
\`\`\`
Or check if file exists first before creating.
### Managing Events
- List: \`ls ${workspacePath}/events/\`
- View: \`cat ${workspacePath}/events/foo.json\`
- Delete/cancel: \`rm ${workspacePath}/events/foo.json\`
### When Events Trigger
You receive a message like:
\`\`\`
[EVENT:dentist-reminder.json:one-shot:2025-12-14T09:00:00+01:00] Dentist tomorrow
\`\`\`
Immediate and one-shot events auto-delete after triggering. Periodic events persist until you delete them.
### Debouncing
When writing programs that create immediate events (email watchers, webhook handlers, etc.), always debounce. If 50 emails arrive in a minute, don't create 50 immediate events. Instead collect events over a window and create ONE immediate event summarizing what happened, or just signal "new activity, check inbox" rather than per-item events. Or simpler: use a periodic event to check for new items every N minutes instead of immediate events.
### Limits
Maximum 5 events can be queued. Don't create excessive immediate or periodic events.
## Memory
Write to MEMORY.md files to persist context across conversations.
- Global (${workspacePath}/MEMORY.md): skills, preferences, project info

383
packages/mom/src/events.ts Normal file
View file

@ -0,0 +1,383 @@
import { Cron } from "croner";
import { existsSync, type FSWatcher, mkdirSync, readdirSync, statSync, unlinkSync, watch } from "fs";
import { readFile } from "fs/promises";
import { join } from "path";
import * as log from "./log.js";
import type { SlackBot, SlackEvent } from "./slack.js";
// ============================================================================
// Event Types
// ============================================================================
export interface ImmediateEvent {
type: "immediate";
channelId: string;
text: string;
}
export interface OneShotEvent {
type: "one-shot";
channelId: string;
text: string;
at: string; // ISO 8601 with timezone offset
}
export interface PeriodicEvent {
type: "periodic";
channelId: string;
text: string;
schedule: string; // cron syntax
timezone: string; // IANA timezone
}
export type MomEvent = ImmediateEvent | OneShotEvent | PeriodicEvent;
// ============================================================================
// EventsWatcher
// ============================================================================
const DEBOUNCE_MS = 100;
const MAX_RETRIES = 3;
const RETRY_BASE_MS = 100;
export class EventsWatcher {
private timers: Map<string, NodeJS.Timeout> = new Map();
private crons: Map<string, Cron> = new Map();
private debounceTimers: Map<string, NodeJS.Timeout> = new Map();
private startTime: number;
private watcher: FSWatcher | null = null;
private knownFiles: Set<string> = new Set();
constructor(
private eventsDir: string,
private slack: SlackBot,
) {
this.startTime = Date.now();
}
/**
* Start watching for events. Call this after SlackBot is ready.
*/
start(): void {
// Ensure events directory exists
if (!existsSync(this.eventsDir)) {
mkdirSync(this.eventsDir, { recursive: true });
}
log.logInfo(`Events watcher starting, dir: ${this.eventsDir}`);
// Scan existing files
this.scanExisting();
// Watch for changes
this.watcher = watch(this.eventsDir, (_eventType, filename) => {
if (!filename || !filename.endsWith(".json")) return;
this.debounce(filename, () => this.handleFileChange(filename));
});
log.logInfo(`Events watcher started, tracking ${this.knownFiles.size} files`);
}
/**
* Stop watching and cancel all scheduled events.
*/
stop(): void {
// Stop fs watcher
if (this.watcher) {
this.watcher.close();
this.watcher = null;
}
// Cancel all debounce timers
for (const timer of this.debounceTimers.values()) {
clearTimeout(timer);
}
this.debounceTimers.clear();
// Cancel all scheduled timers
for (const timer of this.timers.values()) {
clearTimeout(timer);
}
this.timers.clear();
// Cancel all cron jobs
for (const cron of this.crons.values()) {
cron.stop();
}
this.crons.clear();
this.knownFiles.clear();
log.logInfo("Events watcher stopped");
}
private debounce(filename: string, fn: () => void): void {
const existing = this.debounceTimers.get(filename);
if (existing) {
clearTimeout(existing);
}
this.debounceTimers.set(
filename,
setTimeout(() => {
this.debounceTimers.delete(filename);
fn();
}, DEBOUNCE_MS),
);
}
private scanExisting(): void {
let files: string[];
try {
files = readdirSync(this.eventsDir).filter((f) => f.endsWith(".json"));
} catch (err) {
log.logWarning("Failed to read events directory", String(err));
return;
}
for (const filename of files) {
this.handleFile(filename);
}
}
private handleFileChange(filename: string): void {
const filePath = join(this.eventsDir, filename);
if (!existsSync(filePath)) {
// File was deleted
this.handleDelete(filename);
} else if (this.knownFiles.has(filename)) {
// File was modified - cancel existing and re-schedule
this.cancelScheduled(filename);
this.handleFile(filename);
} else {
// New file
this.handleFile(filename);
}
}
private handleDelete(filename: string): void {
if (!this.knownFiles.has(filename)) return;
log.logInfo(`Event file deleted: ${filename}`);
this.cancelScheduled(filename);
this.knownFiles.delete(filename);
}
private cancelScheduled(filename: string): void {
const timer = this.timers.get(filename);
if (timer) {
clearTimeout(timer);
this.timers.delete(filename);
}
const cron = this.crons.get(filename);
if (cron) {
cron.stop();
this.crons.delete(filename);
}
}
private async handleFile(filename: string): Promise<void> {
const filePath = join(this.eventsDir, filename);
// Parse with retries
let event: MomEvent | null = null;
let lastError: Error | null = null;
for (let i = 0; i < MAX_RETRIES; i++) {
try {
const content = await readFile(filePath, "utf-8");
event = this.parseEvent(content, filename);
break;
} catch (err) {
lastError = err instanceof Error ? err : new Error(String(err));
if (i < MAX_RETRIES - 1) {
await this.sleep(RETRY_BASE_MS * 2 ** i);
}
}
}
if (!event) {
log.logWarning(`Failed to parse event file after ${MAX_RETRIES} retries: ${filename}`, lastError?.message);
this.deleteFile(filename);
return;
}
this.knownFiles.add(filename);
// Schedule based on type
switch (event.type) {
case "immediate":
this.handleImmediate(filename, event);
break;
case "one-shot":
this.handleOneShot(filename, event);
break;
case "periodic":
this.handlePeriodic(filename, event);
break;
}
}
private parseEvent(content: string, filename: string): MomEvent | null {
const data = JSON.parse(content);
if (!data.type || !data.channelId || !data.text) {
throw new Error(`Missing required fields (type, channelId, text) in ${filename}`);
}
switch (data.type) {
case "immediate":
return { type: "immediate", channelId: data.channelId, text: data.text };
case "one-shot":
if (!data.at) {
throw new Error(`Missing 'at' field for one-shot event in ${filename}`);
}
return { type: "one-shot", channelId: data.channelId, text: data.text, at: data.at };
case "periodic":
if (!data.schedule) {
throw new Error(`Missing 'schedule' field for periodic event in ${filename}`);
}
if (!data.timezone) {
throw new Error(`Missing 'timezone' field for periodic event in ${filename}`);
}
return {
type: "periodic",
channelId: data.channelId,
text: data.text,
schedule: data.schedule,
timezone: data.timezone,
};
default:
throw new Error(`Unknown event type '${data.type}' in ${filename}`);
}
}
private handleImmediate(filename: string, event: ImmediateEvent): void {
const filePath = join(this.eventsDir, filename);
// Check if stale (created before harness started)
try {
const stat = statSync(filePath);
if (stat.mtimeMs < this.startTime) {
log.logInfo(`Stale immediate event, deleting: ${filename}`);
this.deleteFile(filename);
return;
}
} catch {
// File may have been deleted
return;
}
log.logInfo(`Executing immediate event: ${filename}`);
this.execute(filename, event);
}
private handleOneShot(filename: string, event: OneShotEvent): void {
const atTime = new Date(event.at).getTime();
const now = Date.now();
if (atTime <= now) {
// Past - delete without executing
log.logInfo(`One-shot event in the past, deleting: ${filename}`);
this.deleteFile(filename);
return;
}
const delay = atTime - now;
log.logInfo(`Scheduling one-shot event: ${filename} in ${Math.round(delay / 1000)}s`);
const timer = setTimeout(() => {
this.timers.delete(filename);
log.logInfo(`Executing one-shot event: ${filename}`);
this.execute(filename, event);
}, delay);
this.timers.set(filename, timer);
}
private handlePeriodic(filename: string, event: PeriodicEvent): void {
try {
const cron = new Cron(event.schedule, { timezone: event.timezone }, () => {
log.logInfo(`Executing periodic event: ${filename}`);
this.execute(filename, event, false); // Don't delete periodic events
});
this.crons.set(filename, cron);
const next = cron.nextRun();
log.logInfo(`Scheduled periodic event: ${filename}, next run: ${next?.toISOString() ?? "unknown"}`);
} catch (err) {
log.logWarning(`Invalid cron schedule for ${filename}: ${event.schedule}`, String(err));
this.deleteFile(filename);
}
}
private execute(filename: string, event: MomEvent, deleteAfter: boolean = true): void {
// Format the message
let scheduleInfo: string;
switch (event.type) {
case "immediate":
scheduleInfo = "immediate";
break;
case "one-shot":
scheduleInfo = event.at;
break;
case "periodic":
scheduleInfo = event.schedule;
break;
}
const message = `[EVENT:${filename}:${event.type}:${scheduleInfo}] ${event.text}`;
// Create synthetic SlackEvent
const syntheticEvent: SlackEvent = {
type: "mention",
channel: event.channelId,
user: "EVENT",
text: message,
ts: Date.now().toString(),
};
// Enqueue for processing
const enqueued = this.slack.enqueueEvent(syntheticEvent);
if (enqueued && deleteAfter) {
// Delete file after successful enqueue (immediate and one-shot)
this.deleteFile(filename);
} else if (!enqueued) {
log.logWarning(`Event queue full, discarded: ${filename}`);
// Still delete immediate/one-shot even if discarded
if (deleteAfter) {
this.deleteFile(filename);
}
}
}
private deleteFile(filename: string): void {
const filePath = join(this.eventsDir, filename);
try {
unlinkSync(filePath);
} catch (err) {
// ENOENT is fine (file already deleted), other errors are warnings
if (err instanceof Error && "code" in err && err.code !== "ENOENT") {
log.logWarning(`Failed to delete event file: ${filename}`, String(err));
}
}
this.knownFiles.delete(filename);
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
/**
* Create and start an events watcher.
*/
export function createEventsWatcher(workspaceDir: string, slack: SlackBot): EventsWatcher {
const eventsDir = join(workspaceDir, "events");
return new EventsWatcher(eventsDir, slack);
}

View file

@ -4,6 +4,7 @@ import { join, resolve } from "path";
import { type AgentRunner, getOrCreateRunner } from "./agent.js";
import { syncLogToContext } from "./context.js";
import { downloadChannel } from "./download.js";
import { createEventsWatcher } from "./events.js";
import * as log from "./log.js";
import { parseSandboxArg, type SandboxConfig, validateSandbox } from "./sandbox.js";
import { type MomHandler, type SlackBot, SlackBot as SlackBotClass, type SlackEvent } from "./slack.js";
@ -113,7 +114,7 @@ function getState(channelId: string): ChannelState {
// Create SlackContext adapter
// ============================================================================
function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelState) {
function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelState, isEvent?: boolean) {
let messageTs: string | null = null;
let accumulatedText = "";
let isWorking = true;
@ -122,6 +123,9 @@ function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelSt
const user = slack.getUser(event.user);
// Extract event filename for status message
const eventFilename = isEvent ? event.text.match(/^\[EVENT:([^:]+):/)?.[1] : undefined;
return {
message: {
text: event.text,
@ -179,8 +183,13 @@ function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelSt
setTyping: async (isTyping: boolean) => {
if (isTyping && !messageTs) {
accumulatedText = "_Thinking_";
messageTs = await slack.postMessage(event.channel, accumulatedText + workingIndicator);
updatePromise = updatePromise.then(async () => {
if (!messageTs) {
accumulatedText = eventFilename ? `_Starting event: ${eventFilename}_` : "_Thinking_";
messageTs = await slack.postMessage(event.channel, accumulatedText + workingIndicator);
}
});
await updatePromise;
}
},
@ -223,7 +232,7 @@ const handler: MomHandler = {
}
},
async handleEvent(event: SlackEvent, slack: SlackBot): Promise<void> {
async handleEvent(event: SlackEvent, slack: SlackBot, isEvent?: boolean): Promise<void> {
const state = getState(event.channel);
const channelDir = join(workingDir, event.channel);
@ -243,7 +252,7 @@ const handler: MomHandler = {
}
// Create context adapter
const ctx = createSlackContext(event, slack, state);
const ctx = createSlackContext(event, slack, state, isEvent);
// Run the agent
await ctx.setTyping(true);
@ -283,4 +292,21 @@ const bot = new SlackBotClass(handler, {
store: sharedStore,
});
// Start events watcher
const eventsWatcher = createEventsWatcher(workingDir, bot);
eventsWatcher.start();
// Handle shutdown
process.on("SIGINT", () => {
log.logInfo("Shutting down...");
eventsWatcher.stop();
process.exit(0);
});
process.on("SIGTERM", () => {
log.logInfo("Shutting down...");
eventsWatcher.stop();
process.exit(0);
});
bot.start();

View file

@ -72,9 +72,10 @@ export interface MomHandler {
/**
* Handle an event that triggers mom (ASYNC)
* Called only when isRunning() returned false
* Called only when isRunning() returned false for user messages.
* Events always queue and pass isEvent=true.
*/
handleEvent(event: SlackEvent, slack: SlackBot): Promise<void>;
handleEvent(event: SlackEvent, slack: SlackBot, isEvent?: boolean): Promise<void>;
/**
* Handle stop command (ASYNC)
@ -98,6 +99,10 @@ class ChannelQueue {
this.processNext();
}
size(): number {
return this.queue.length;
}
private async processNext(): Promise<void> {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
@ -226,6 +231,25 @@ export class SlackBot {
});
}
// ==========================================================================
// Events Integration
// ==========================================================================
/**
* Enqueue an event for processing. Always queues (no "already working" rejection).
* Returns true if enqueued, false if queue is full (max 5).
*/
enqueueEvent(event: SlackEvent): boolean {
const queue = this.getQueue(event.channel);
if (queue.size() >= 5) {
log.logWarning(`Event queue full for ${event.channel}, discarding: ${event.text.substring(0, 50)}`);
return false;
}
log.logInfo(`Enqueueing event for ${event.channel}: ${event.text.substring(0, 50)}`);
queue.enqueue(() => this.handler.handleEvent(event, this, true));
return true;
}
// ==========================================================================
// Private - Event Handlers
// ==========================================================================