diff --git a/package-lock.json b/package-lock.json index 214cd092..9579db12 100644 --- a/package-lock.json +++ b/package-lock.json @@ -255,6 +255,7 @@ "cpu": [ "ppc64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -271,6 +272,7 @@ "cpu": [ "arm" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -287,6 +289,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -303,6 +306,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -319,6 +323,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -335,6 +340,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -351,6 +357,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -367,6 +374,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -383,6 +391,7 @@ "cpu": [ "arm" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -399,6 +408,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -415,6 +425,7 @@ "cpu": [ "ia32" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -431,6 +442,7 @@ "cpu": [ "loong64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -447,6 +459,7 @@ "cpu": [ "mips64el" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -463,6 +476,7 @@ "cpu": [ "ppc64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -479,6 +493,7 @@ "cpu": [ "riscv64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -495,6 +510,7 @@ "cpu": [ "s390x" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -511,6 +527,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -527,6 +544,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -543,6 +561,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -559,6 +578,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -575,6 +595,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -591,6 +612,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -607,6 +629,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -623,6 +646,7 @@ "cpu": [ "arm64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -639,6 +663,7 @@ "cpu": [ "ia32" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -655,6 +680,7 @@ "cpu": [ "x64" ], + "dev": true, "license": "MIT", "optional": true, "os": [ @@ -2985,6 +3011,15 @@ "integrity": "sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ==", "license": "MIT" }, + "node_modules/croner": { + "version": "9.1.0", + "resolved": "https://registry.npmjs.org/croner/-/croner-9.1.0.tgz", + "integrity": "sha512-p9nwwR4qyT5W996vBZhdvBCnMhicY5ytZkR4D1Xj0wuTDEiMnjwR57Q3RXYY/s0EpX6Ay3vgIcfaR+ewGHsi+g==", + "license": "MIT", + "engines": { + "node": ">=18.0" + } + }, "node_modules/cross-spawn": { "version": "7.0.6", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", @@ -6642,6 +6677,7 @@ "@slack/socket-mode": "^2.0.0", "@slack/web-api": "^7.0.0", "chalk": "^5.6.2", + "croner": "^9.1.0", "diff": "^8.0.2" }, "bin": { diff --git a/packages/mom/CHANGELOG.md b/packages/mom/CHANGELOG.md index 8c4c90c5..ae7a4cd6 100644 --- a/packages/mom/CHANGELOG.md +++ b/packages/mom/CHANGELOG.md @@ -2,6 +2,16 @@ ## [Unreleased] +### Added + +- Events system: schedule wake-ups via JSON files in `workspace/events/` + - Immediate events: trigger when file is created (for webhooks, external signals) + - One-shot events: trigger at specific time (for reminders) + - Periodic events: trigger on cron schedule (for recurring tasks) +- `SlackBot.enqueueEvent()` for queueing events (max 5 per channel) +- Events documentation in `docs/events.md` +- System prompt section explaining events to mom + ## [0.18.8] - 2025-12-12 ### Changed diff --git a/packages/mom/docs/events.md b/packages/mom/docs/events.md new file mode 100644 index 00000000..1b7c3f23 --- /dev/null +++ b/packages/mom/docs/events.md @@ -0,0 +1,300 @@ +# Events System + +The events system allows mom to be triggered by scheduled or immediate events. Events are JSON files in the `workspace/events/` directory. The harness watches this directory and executes events when they become due. + +## Event Types + +### Immediate + +Executes as soon as the harness discovers the file. Used by programs mom writes to signal external events (webhooks, file changes, API callbacks, etc.). + +```json +{ + "type": "immediate", + "channelId": "C123ABC", + "text": "New support ticket received: #12345" +} +``` + +After execution, the file is deleted. Staleness is determined by file mtime (see Startup Behavior). + +### One-Shot + +Executes once at a specific date/time. Used for reminders, scheduled tasks, or deferred actions. + +```json +{ + "type": "one-shot", + "channelId": "C123ABC", + "text": "Remind Mario about the dentist appointment", + "at": "2025-12-15T09:00:00+01:00" +} +``` + +The `at` timestamp must include a timezone offset. After execution, the file is deleted. + +### Periodic + +Executes repeatedly on a cron schedule. Used for recurring tasks like daily summaries, weekly reports, or regular checks. + +```json +{ + "type": "periodic", + "channelId": "C123ABC", + "text": "Check inbox and post summary", + "schedule": "0 9 * * 1-5", + "timezone": "Europe/Vienna" +} +``` + +The `schedule` field uses standard cron syntax. The `timezone` field uses IANA timezone names. The file persists until explicitly deleted by mom or the program that created it. + +#### Cron Format + +`minute hour day-of-month month day-of-week` + +Examples: +- `0 9 * * *` — daily at 9:00 +- `0 9 * * 1-5` — weekdays at 9:00 +- `30 14 * * 1` — Mondays at 14:30 +- `0 0 1 * *` — first of each month at midnight +- `*/15 * * * *` — every 15 minutes + +## Timezone Handling + +All timestamps must include timezone information: +- For `one-shot`: Use ISO 8601 format with offset (e.g., `2025-12-15T09:00:00+01:00`) +- For `periodic`: Use the `timezone` field with an IANA timezone name (e.g., `Europe/Vienna`, `America/New_York`) + +The harness runs in the host process timezone. When users mention times without specifying timezone, assume the harness timezone. + +## Harness Behavior + +### Startup + +1. Scan `workspace/events/` for all `.json` files +2. Parse each event file +3. For each event: + - **Immediate**: Check file mtime. If the file was created while the harness was NOT running (mtime < harness start time), it's stale. Delete without executing. Otherwise, execute immediately and delete. + - **One-shot**: If `at` is in the past, delete the file. If `at` is in the future, set a `setTimeout` to execute at the specified time. + - **Periodic**: Set up a cron job (using `croner` library) to execute on the specified schedule. If a scheduled time was missed while harness was down, do NOT catch up. Wait for the next scheduled occurrence. + +### File System Watching + +The harness watches `workspace/events/` using `fs.watch()` with 100ms debounce. + +**New file added:** +- Parse the event +- Based on type: execute immediately, set `setTimeout`, or set up cron job + +**Existing file modified:** +- Cancel any existing timer/cron for this file +- Re-parse and set up again (allows rescheduling) + +**File deleted:** +- Cancel any existing timer/cron for this file + +### Parse Errors + +If a JSON file fails to parse: +1. Retry with exponential backoff (100ms, 200ms, 400ms) +2. If still failing after retries, delete the file and log error to console + +### Execution Errors + +If the agent errors while processing an event: +1. Post error message to the channel +2. Delete the event file (for immediate/one-shot) +3. No retries + +## Queue Integration + +Events integrate with the existing `ChannelQueue` in `SlackBot`: + +- New method: `SlackBot.enqueueEvent(event: SlackEvent)` — always queues, no "already working" rejection +- Maximum 5 events can be queued per channel. If queue is full, discard and log to console. +- User @mom mentions retain current behavior: rejected with "Already working" message if agent is busy + +When an event triggers: +1. Create a synthetic `SlackEvent` with formatted message +2. Call `slack.enqueueEvent(event)` +3. Event waits in queue if agent is busy, processed when idle + +## Event Execution + +When an event is dequeued and executes: + +1. Post status message: "_Starting event: {filename}_" +2. Invoke the agent with message: `[EVENT:{filename}:{type}:{schedule}] {text}` + - For immediate: `[EVENT:webhook-123.json:immediate] New support ticket` + - For one-shot: `[EVENT:dentist.json:one-shot:2025-12-15T09:00:00+01:00] Remind Mario` + - For periodic: `[EVENT:daily-inbox.json:periodic:0 9 * * 1-5] Check inbox` +3. After execution: + - Immediate and one-shot: delete the event file + - Periodic: keep the file, event will trigger again on schedule + +## File Naming + +Event files should have descriptive names ending in `.json`: +- `webhook-12345.json` (immediate) +- `dentist-reminder-2025-12-15.json` (one-shot) +- `daily-inbox-summary.json` (periodic) + +The filename is used as an identifier for tracking timers and in the event message. Avoid special characters. + +## Implementation + +### Files + +- `src/events.ts` — Event parsing, timer management, fs watching +- `src/slack.ts` — Add `enqueueEvent()` method and `size()` to `ChannelQueue` +- `src/main.ts` — Initialize events watcher on startup +- `src/agent.ts` — Update system prompt with events documentation + +### Key Components + +```typescript +// events.ts + +interface ImmediateEvent { + type: "immediate"; + channelId: string; + text: string; +} + +interface OneShotEvent { + type: "one-shot"; + channelId: string; + text: string; + at: string; // ISO 8601 with timezone offset +} + +interface PeriodicEvent { + type: "periodic"; + channelId: string; + text: string; + schedule: string; // cron syntax + timezone: string; // IANA timezone +} + +type MomEvent = ImmediateEvent | OneShotEvent | PeriodicEvent; + +class EventsWatcher { + private timers: Map = new Map(); + private crons: Map = new Map(); + private startTime: number; + + constructor( + private eventsDir: string, + private slack: SlackBot, + private onError: (filename: string, error: Error) => void + ) { + this.startTime = Date.now(); + } + + start(): void { /* scan existing, setup fs.watch */ } + stop(): void { /* cancel all timers/crons, stop watching */ } + + private handleFile(filename: string): void { /* parse, schedule */ } + private handleDelete(filename: string): void { /* cancel timer/cron */ } + private execute(filename: string, event: MomEvent): void { /* enqueue */ } +} +``` + +### Dependencies + +- `croner` — Cron scheduling with timezone support + +## System Prompt Section + +The following should be added to mom's system prompt: + +```markdown +## Events + +You can schedule events that wake you up at specific times or when external things happen. Events are JSON files in `/workspace/events/`. + +### Event Types + +**Immediate** — Triggers as soon as harness sees the file. Use in scripts/webhooks to signal external events. +```json +{"type": "immediate", "channelId": "C123", "text": "New GitHub issue opened"} +``` + +**One-shot** — Triggers once at a specific time. Use for reminders. +```json +{"type": "one-shot", "channelId": "C123", "text": "Remind Mario about dentist", "at": "2025-12-15T09:00:00+01:00"} +``` + +**Periodic** — Triggers on a cron schedule. Use for recurring tasks. +```json +{"type": "periodic", "channelId": "C123", "text": "Check inbox and summarize", "schedule": "0 9 * * 1-5", "timezone": "Europe/Vienna"} +``` + +### Cron Format + +`minute hour day-of-month month day-of-week` + +- `0 9 * * *` = daily at 9:00 +- `0 9 * * 1-5` = weekdays at 9:00 +- `30 14 * * 1` = Mondays at 14:30 +- `0 0 1 * *` = first of each month at midnight + +### Timezones + +All `at` timestamps must include offset (e.g., `+01:00`). Periodic events use IANA timezone names. The harness runs in ${TIMEZONE}. When users mention times without timezone, assume ${TIMEZONE}. + +### Creating Events + +```bash +cat > /workspace/events/dentist-reminder.json << 'EOF' +{"type": "one-shot", "channelId": "${CHANNEL}", "text": "Dentist tomorrow", "at": "2025-12-14T09:00:00+01:00"} +EOF +``` + +### Managing Events + +- List: `ls /workspace/events/` +- View: `cat /workspace/events/foo.json` +- Delete/cancel: `rm /workspace/events/foo.json` + +### When Events Trigger + +You receive a message like: +``` +[EVENT:dentist-reminder.json:one-shot:2025-12-14T09:00:00+01:00] Dentist tomorrow +``` + +Immediate and one-shot events auto-delete after triggering. Periodic events persist until you delete them. + +### Debouncing + +When writing programs that create immediate events (email watchers, webhook handlers, etc.), always debounce. If 50 emails arrive in a minute, don't create 50 immediate events. Instead: + +- Collect events over a window (e.g., 30 seconds) +- Create ONE immediate event summarizing what happened +- Or just signal "new activity, check inbox" rather than per-item events + +Bad: +```bash +# Creates event per email — will flood the queue +on_email() { echo '{"type":"immediate"...}' > /workspace/events/email-$ID.json; } +``` + +Good: +```bash +# Debounce: flag file + single delayed event +on_email() { + echo "$SUBJECT" >> /tmp/pending-emails.txt + if [ ! -f /workspace/events/email-batch.json ]; then + (sleep 30 && mv /tmp/pending-emails.txt /workspace/events/email-batch.json) & + fi +} +``` + +Or simpler: use a periodic event to check for new emails every 15 minutes instead of immediate events. + +### Limits + +Maximum 5 events can be queued. Don't create excessive immediate or periodic events. +``` diff --git a/packages/mom/package.json b/packages/mom/package.json index c5da5a7b..fe432a9f 100644 --- a/packages/mom/package.json +++ b/packages/mom/package.json @@ -28,6 +28,7 @@ "@slack/socket-mode": "^2.0.0", "@slack/web-api": "^7.0.0", "chalk": "^5.6.2", + "croner": "^9.1.0", "diff": "^8.0.2" }, "devDependencies": { diff --git a/packages/mom/src/agent.ts b/packages/mom/src/agent.ts index b59ba233..42cb7d33 100644 --- a/packages/mom/src/agent.ts +++ b/packages/mom/src/agent.ts @@ -160,6 +160,63 @@ Store in \`${workspacePath}/skills//\` or \`${channelPath}/skills//\ Each skill needs a \`SKILL.md\` documenting usage. Read it before using a skill. List skills in global memory so you remember them. +## Events +You can schedule events that wake you up at specific times or when external things happen. Events are JSON files in \`${workspacePath}/events/\`. + +### Event Types + +**Immediate** - Triggers as soon as harness sees the file. Use in scripts/webhooks to signal external events. +\`\`\`json +{"type": "immediate", "channelId": "${channelId}", "text": "New GitHub issue opened"} +\`\`\` + +**One-shot** - Triggers once at a specific time. Use for reminders. +\`\`\`json +{"type": "one-shot", "channelId": "${channelId}", "text": "Remind Mario about dentist", "at": "2025-12-15T09:00:00+01:00"} +\`\`\` + +**Periodic** - Triggers on a cron schedule. Use for recurring tasks. +\`\`\`json +{"type": "periodic", "channelId": "${channelId}", "text": "Check inbox and summarize", "schedule": "0 9 * * 1-5", "timezone": "${Intl.DateTimeFormat().resolvedOptions().timeZone}"} +\`\`\` + +### Cron Format +\`minute hour day-of-month month day-of-week\` +- \`0 9 * * *\` = daily at 9:00 +- \`0 9 * * 1-5\` = weekdays at 9:00 +- \`30 14 * * 1\` = Mondays at 14:30 +- \`0 0 1 * *\` = first of each month at midnight + +### Timezones +All \`at\` timestamps must include offset (e.g., \`+01:00\`). Periodic events use IANA timezone names. The harness runs in ${Intl.DateTimeFormat().resolvedOptions().timeZone}. When users mention times without timezone, assume ${Intl.DateTimeFormat().resolvedOptions().timeZone}. + +### Creating Events +Use unique filenames to avoid overwriting existing events. Include a timestamp or random suffix: +\`\`\`bash +cat > ${workspacePath}/events/dentist-reminder-$(date +%s).json << 'EOF' +{"type": "one-shot", "channelId": "${channelId}", "text": "Dentist tomorrow", "at": "2025-12-14T09:00:00+01:00"} +EOF +\`\`\` +Or check if file exists first before creating. + +### Managing Events +- List: \`ls ${workspacePath}/events/\` +- View: \`cat ${workspacePath}/events/foo.json\` +- Delete/cancel: \`rm ${workspacePath}/events/foo.json\` + +### When Events Trigger +You receive a message like: +\`\`\` +[EVENT:dentist-reminder.json:one-shot:2025-12-14T09:00:00+01:00] Dentist tomorrow +\`\`\` +Immediate and one-shot events auto-delete after triggering. Periodic events persist until you delete them. + +### Debouncing +When writing programs that create immediate events (email watchers, webhook handlers, etc.), always debounce. If 50 emails arrive in a minute, don't create 50 immediate events. Instead collect events over a window and create ONE immediate event summarizing what happened, or just signal "new activity, check inbox" rather than per-item events. Or simpler: use a periodic event to check for new items every N minutes instead of immediate events. + +### Limits +Maximum 5 events can be queued. Don't create excessive immediate or periodic events. + ## Memory Write to MEMORY.md files to persist context across conversations. - Global (${workspacePath}/MEMORY.md): skills, preferences, project info diff --git a/packages/mom/src/events.ts b/packages/mom/src/events.ts new file mode 100644 index 00000000..2bb099e4 --- /dev/null +++ b/packages/mom/src/events.ts @@ -0,0 +1,383 @@ +import { Cron } from "croner"; +import { existsSync, type FSWatcher, mkdirSync, readdirSync, statSync, unlinkSync, watch } from "fs"; +import { readFile } from "fs/promises"; +import { join } from "path"; +import * as log from "./log.js"; +import type { SlackBot, SlackEvent } from "./slack.js"; + +// ============================================================================ +// Event Types +// ============================================================================ + +export interface ImmediateEvent { + type: "immediate"; + channelId: string; + text: string; +} + +export interface OneShotEvent { + type: "one-shot"; + channelId: string; + text: string; + at: string; // ISO 8601 with timezone offset +} + +export interface PeriodicEvent { + type: "periodic"; + channelId: string; + text: string; + schedule: string; // cron syntax + timezone: string; // IANA timezone +} + +export type MomEvent = ImmediateEvent | OneShotEvent | PeriodicEvent; + +// ============================================================================ +// EventsWatcher +// ============================================================================ + +const DEBOUNCE_MS = 100; +const MAX_RETRIES = 3; +const RETRY_BASE_MS = 100; + +export class EventsWatcher { + private timers: Map = new Map(); + private crons: Map = new Map(); + private debounceTimers: Map = new Map(); + private startTime: number; + private watcher: FSWatcher | null = null; + private knownFiles: Set = new Set(); + + constructor( + private eventsDir: string, + private slack: SlackBot, + ) { + this.startTime = Date.now(); + } + + /** + * Start watching for events. Call this after SlackBot is ready. + */ + start(): void { + // Ensure events directory exists + if (!existsSync(this.eventsDir)) { + mkdirSync(this.eventsDir, { recursive: true }); + } + + log.logInfo(`Events watcher starting, dir: ${this.eventsDir}`); + + // Scan existing files + this.scanExisting(); + + // Watch for changes + this.watcher = watch(this.eventsDir, (_eventType, filename) => { + if (!filename || !filename.endsWith(".json")) return; + this.debounce(filename, () => this.handleFileChange(filename)); + }); + + log.logInfo(`Events watcher started, tracking ${this.knownFiles.size} files`); + } + + /** + * Stop watching and cancel all scheduled events. + */ + stop(): void { + // Stop fs watcher + if (this.watcher) { + this.watcher.close(); + this.watcher = null; + } + + // Cancel all debounce timers + for (const timer of this.debounceTimers.values()) { + clearTimeout(timer); + } + this.debounceTimers.clear(); + + // Cancel all scheduled timers + for (const timer of this.timers.values()) { + clearTimeout(timer); + } + this.timers.clear(); + + // Cancel all cron jobs + for (const cron of this.crons.values()) { + cron.stop(); + } + this.crons.clear(); + + this.knownFiles.clear(); + log.logInfo("Events watcher stopped"); + } + + private debounce(filename: string, fn: () => void): void { + const existing = this.debounceTimers.get(filename); + if (existing) { + clearTimeout(existing); + } + this.debounceTimers.set( + filename, + setTimeout(() => { + this.debounceTimers.delete(filename); + fn(); + }, DEBOUNCE_MS), + ); + } + + private scanExisting(): void { + let files: string[]; + try { + files = readdirSync(this.eventsDir).filter((f) => f.endsWith(".json")); + } catch (err) { + log.logWarning("Failed to read events directory", String(err)); + return; + } + + for (const filename of files) { + this.handleFile(filename); + } + } + + private handleFileChange(filename: string): void { + const filePath = join(this.eventsDir, filename); + + if (!existsSync(filePath)) { + // File was deleted + this.handleDelete(filename); + } else if (this.knownFiles.has(filename)) { + // File was modified - cancel existing and re-schedule + this.cancelScheduled(filename); + this.handleFile(filename); + } else { + // New file + this.handleFile(filename); + } + } + + private handleDelete(filename: string): void { + if (!this.knownFiles.has(filename)) return; + + log.logInfo(`Event file deleted: ${filename}`); + this.cancelScheduled(filename); + this.knownFiles.delete(filename); + } + + private cancelScheduled(filename: string): void { + const timer = this.timers.get(filename); + if (timer) { + clearTimeout(timer); + this.timers.delete(filename); + } + + const cron = this.crons.get(filename); + if (cron) { + cron.stop(); + this.crons.delete(filename); + } + } + + private async handleFile(filename: string): Promise { + const filePath = join(this.eventsDir, filename); + + // Parse with retries + let event: MomEvent | null = null; + let lastError: Error | null = null; + + for (let i = 0; i < MAX_RETRIES; i++) { + try { + const content = await readFile(filePath, "utf-8"); + event = this.parseEvent(content, filename); + break; + } catch (err) { + lastError = err instanceof Error ? err : new Error(String(err)); + if (i < MAX_RETRIES - 1) { + await this.sleep(RETRY_BASE_MS * 2 ** i); + } + } + } + + if (!event) { + log.logWarning(`Failed to parse event file after ${MAX_RETRIES} retries: ${filename}`, lastError?.message); + this.deleteFile(filename); + return; + } + + this.knownFiles.add(filename); + + // Schedule based on type + switch (event.type) { + case "immediate": + this.handleImmediate(filename, event); + break; + case "one-shot": + this.handleOneShot(filename, event); + break; + case "periodic": + this.handlePeriodic(filename, event); + break; + } + } + + private parseEvent(content: string, filename: string): MomEvent | null { + const data = JSON.parse(content); + + if (!data.type || !data.channelId || !data.text) { + throw new Error(`Missing required fields (type, channelId, text) in ${filename}`); + } + + switch (data.type) { + case "immediate": + return { type: "immediate", channelId: data.channelId, text: data.text }; + + case "one-shot": + if (!data.at) { + throw new Error(`Missing 'at' field for one-shot event in ${filename}`); + } + return { type: "one-shot", channelId: data.channelId, text: data.text, at: data.at }; + + case "periodic": + if (!data.schedule) { + throw new Error(`Missing 'schedule' field for periodic event in ${filename}`); + } + if (!data.timezone) { + throw new Error(`Missing 'timezone' field for periodic event in ${filename}`); + } + return { + type: "periodic", + channelId: data.channelId, + text: data.text, + schedule: data.schedule, + timezone: data.timezone, + }; + + default: + throw new Error(`Unknown event type '${data.type}' in ${filename}`); + } + } + + private handleImmediate(filename: string, event: ImmediateEvent): void { + const filePath = join(this.eventsDir, filename); + + // Check if stale (created before harness started) + try { + const stat = statSync(filePath); + if (stat.mtimeMs < this.startTime) { + log.logInfo(`Stale immediate event, deleting: ${filename}`); + this.deleteFile(filename); + return; + } + } catch { + // File may have been deleted + return; + } + + log.logInfo(`Executing immediate event: ${filename}`); + this.execute(filename, event); + } + + private handleOneShot(filename: string, event: OneShotEvent): void { + const atTime = new Date(event.at).getTime(); + const now = Date.now(); + + if (atTime <= now) { + // Past - delete without executing + log.logInfo(`One-shot event in the past, deleting: ${filename}`); + this.deleteFile(filename); + return; + } + + const delay = atTime - now; + log.logInfo(`Scheduling one-shot event: ${filename} in ${Math.round(delay / 1000)}s`); + + const timer = setTimeout(() => { + this.timers.delete(filename); + log.logInfo(`Executing one-shot event: ${filename}`); + this.execute(filename, event); + }, delay); + + this.timers.set(filename, timer); + } + + private handlePeriodic(filename: string, event: PeriodicEvent): void { + try { + const cron = new Cron(event.schedule, { timezone: event.timezone }, () => { + log.logInfo(`Executing periodic event: ${filename}`); + this.execute(filename, event, false); // Don't delete periodic events + }); + + this.crons.set(filename, cron); + + const next = cron.nextRun(); + log.logInfo(`Scheduled periodic event: ${filename}, next run: ${next?.toISOString() ?? "unknown"}`); + } catch (err) { + log.logWarning(`Invalid cron schedule for ${filename}: ${event.schedule}`, String(err)); + this.deleteFile(filename); + } + } + + private execute(filename: string, event: MomEvent, deleteAfter: boolean = true): void { + // Format the message + let scheduleInfo: string; + switch (event.type) { + case "immediate": + scheduleInfo = "immediate"; + break; + case "one-shot": + scheduleInfo = event.at; + break; + case "periodic": + scheduleInfo = event.schedule; + break; + } + + const message = `[EVENT:${filename}:${event.type}:${scheduleInfo}] ${event.text}`; + + // Create synthetic SlackEvent + const syntheticEvent: SlackEvent = { + type: "mention", + channel: event.channelId, + user: "EVENT", + text: message, + ts: Date.now().toString(), + }; + + // Enqueue for processing + const enqueued = this.slack.enqueueEvent(syntheticEvent); + + if (enqueued && deleteAfter) { + // Delete file after successful enqueue (immediate and one-shot) + this.deleteFile(filename); + } else if (!enqueued) { + log.logWarning(`Event queue full, discarded: ${filename}`); + // Still delete immediate/one-shot even if discarded + if (deleteAfter) { + this.deleteFile(filename); + } + } + } + + private deleteFile(filename: string): void { + const filePath = join(this.eventsDir, filename); + try { + unlinkSync(filePath); + } catch (err) { + // ENOENT is fine (file already deleted), other errors are warnings + if (err instanceof Error && "code" in err && err.code !== "ENOENT") { + log.logWarning(`Failed to delete event file: ${filename}`, String(err)); + } + } + this.knownFiles.delete(filename); + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} + +/** + * Create and start an events watcher. + */ +export function createEventsWatcher(workspaceDir: string, slack: SlackBot): EventsWatcher { + const eventsDir = join(workspaceDir, "events"); + return new EventsWatcher(eventsDir, slack); +} diff --git a/packages/mom/src/main.ts b/packages/mom/src/main.ts index 8898d73c..02a2bdd8 100644 --- a/packages/mom/src/main.ts +++ b/packages/mom/src/main.ts @@ -4,6 +4,7 @@ import { join, resolve } from "path"; import { type AgentRunner, getOrCreateRunner } from "./agent.js"; import { syncLogToContext } from "./context.js"; import { downloadChannel } from "./download.js"; +import { createEventsWatcher } from "./events.js"; import * as log from "./log.js"; import { parseSandboxArg, type SandboxConfig, validateSandbox } from "./sandbox.js"; import { type MomHandler, type SlackBot, SlackBot as SlackBotClass, type SlackEvent } from "./slack.js"; @@ -113,7 +114,7 @@ function getState(channelId: string): ChannelState { // Create SlackContext adapter // ============================================================================ -function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelState) { +function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelState, isEvent?: boolean) { let messageTs: string | null = null; let accumulatedText = ""; let isWorking = true; @@ -122,6 +123,9 @@ function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelSt const user = slack.getUser(event.user); + // Extract event filename for status message + const eventFilename = isEvent ? event.text.match(/^\[EVENT:([^:]+):/)?.[1] : undefined; + return { message: { text: event.text, @@ -179,8 +183,13 @@ function createSlackContext(event: SlackEvent, slack: SlackBot, state: ChannelSt setTyping: async (isTyping: boolean) => { if (isTyping && !messageTs) { - accumulatedText = "_Thinking_"; - messageTs = await slack.postMessage(event.channel, accumulatedText + workingIndicator); + updatePromise = updatePromise.then(async () => { + if (!messageTs) { + accumulatedText = eventFilename ? `_Starting event: ${eventFilename}_` : "_Thinking_"; + messageTs = await slack.postMessage(event.channel, accumulatedText + workingIndicator); + } + }); + await updatePromise; } }, @@ -223,7 +232,7 @@ const handler: MomHandler = { } }, - async handleEvent(event: SlackEvent, slack: SlackBot): Promise { + async handleEvent(event: SlackEvent, slack: SlackBot, isEvent?: boolean): Promise { const state = getState(event.channel); const channelDir = join(workingDir, event.channel); @@ -243,7 +252,7 @@ const handler: MomHandler = { } // Create context adapter - const ctx = createSlackContext(event, slack, state); + const ctx = createSlackContext(event, slack, state, isEvent); // Run the agent await ctx.setTyping(true); @@ -283,4 +292,21 @@ const bot = new SlackBotClass(handler, { store: sharedStore, }); +// Start events watcher +const eventsWatcher = createEventsWatcher(workingDir, bot); +eventsWatcher.start(); + +// Handle shutdown +process.on("SIGINT", () => { + log.logInfo("Shutting down..."); + eventsWatcher.stop(); + process.exit(0); +}); + +process.on("SIGTERM", () => { + log.logInfo("Shutting down..."); + eventsWatcher.stop(); + process.exit(0); +}); + bot.start(); diff --git a/packages/mom/src/slack.ts b/packages/mom/src/slack.ts index 5e68eaa2..315172c0 100644 --- a/packages/mom/src/slack.ts +++ b/packages/mom/src/slack.ts @@ -72,9 +72,10 @@ export interface MomHandler { /** * Handle an event that triggers mom (ASYNC) - * Called only when isRunning() returned false + * Called only when isRunning() returned false for user messages. + * Events always queue and pass isEvent=true. */ - handleEvent(event: SlackEvent, slack: SlackBot): Promise; + handleEvent(event: SlackEvent, slack: SlackBot, isEvent?: boolean): Promise; /** * Handle stop command (ASYNC) @@ -98,6 +99,10 @@ class ChannelQueue { this.processNext(); } + size(): number { + return this.queue.length; + } + private async processNext(): Promise { if (this.processing || this.queue.length === 0) return; this.processing = true; @@ -226,6 +231,25 @@ export class SlackBot { }); } + // ========================================================================== + // Events Integration + // ========================================================================== + + /** + * Enqueue an event for processing. Always queues (no "already working" rejection). + * Returns true if enqueued, false if queue is full (max 5). + */ + enqueueEvent(event: SlackEvent): boolean { + const queue = this.getQueue(event.channel); + if (queue.size() >= 5) { + log.logWarning(`Event queue full for ${event.channel}, discarding: ${event.text.substring(0, 50)}`); + return false; + } + log.logInfo(`Enqueueing event for ${event.channel}: ${event.text.substring(0, 50)}`); + queue.enqueue(() => this.handler.handleEvent(event, this, true)); + return true; + } + // ========================================================================== // Private - Event Handlers // ========================================================================== diff --git a/pi-mono.code-workspace b/pi-mono.code-workspace index dc380bb7..7cfaae8b 100644 --- a/pi-mono.code-workspace +++ b/pi-mono.code-workspace @@ -3,6 +3,9 @@ { "name": "pi-mono", "path": "." + }, + { + "path": "../../moms" } ], "settings": {}