move pi-mono into companion-cloud as apps/companion-os

- Copy all pi-mono source into apps/companion-os/
- Update Dockerfile to COPY pre-built binary instead of downloading from GitHub Releases
- Update deploy-staging.yml to build pi from source (bun compile) before Docker build
- Add apps/companion-os/** to path triggers
- No more cross-repo dispatch needed

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Harivansh Rathi 2026-03-07 09:22:50 -08:00
commit 0250f72976
579 changed files with 206942 additions and 0 deletions

View file

@ -0,0 +1,423 @@
/**
* pi-channels Built-in Slack adapter (bidirectional).
*
* Outgoing: Slack Web API chat.postMessage.
* Incoming: Socket Mode (WebSocket) for events + slash commands.
*
* Supports:
* - Text messages (channels, groups, DMs, multi-party DMs)
* - @mentions (app_mention events)
* - Slash commands (/aivena by default)
* - Typing indicators (chat action)
* - Thread replies (when replying in threads)
* - Message splitting for long messages (>3000 chars)
* - Channel allowlisting (optional)
*
* Requires:
* - App-level token (xapp-...) for Socket Mode in settings under pi-channels.slack.appToken
* - Bot token (xoxb-...) for Web API in settings under pi-channels.slack.botToken
* - Socket Mode enabled in app settings
*
* Config in ~/.pi/agent/settings.json:
* {
* "pi-channels": {
* "adapters": {
* "slack": {
* "type": "slack",
* "allowedChannelIds": ["C0123456789"],
* "respondToMentionsOnly": true,
* "slashCommand": "/aivena"
* }
* },
* "slack": {
* "appToken": "xapp-1-...",
* "botToken": "xoxb-..."
* }
* }
* }
*/
import { SocketModeClient } from "@slack/socket-mode";
import { WebClient } from "@slack/web-api";
import { getChannelSetting } from "../config.js";
import type {
AdapterConfig,
ChannelAdapter,
ChannelMessage,
OnIncomingMessage,
} from "../types.js";
const MAX_LENGTH = 3000; // Slack block text limit; actual API limit is 4000 but leave margin
// ── Slack event types (subset) ──────────────────────────────────
interface SlackMessageEvent {
type: string;
subtype?: string;
channel: string;
user?: string;
text?: string;
ts: string;
thread_ts?: string;
channel_type?: string;
bot_id?: string;
}
interface SlackMentionEvent {
type: string;
channel: string;
user: string;
text: string;
ts: string;
thread_ts?: string;
}
interface SlackCommandPayload {
command: string;
text: string;
user_id: string;
user_name: string;
channel_id: string;
channel_name: string;
trigger_id: string;
}
// ── Factory ─────────────────────────────────────────────────────
export type SlackAdapterLogger = (
event: string,
data: Record<string, unknown>,
level?: string,
) => void;
export function createSlackAdapter(
config: AdapterConfig,
cwd?: string,
log?: SlackAdapterLogger,
): ChannelAdapter {
// Tokens live in settings under pi-channels.slack (not in the adapter config block)
const appToken =
(cwd ? (getChannelSetting(cwd, "slack.appToken") as string) : null) ??
(config.appToken as string);
const botToken =
(cwd ? (getChannelSetting(cwd, "slack.botToken") as string) : null) ??
(config.botToken as string);
const allowedChannelIds = config.allowedChannelIds as string[] | undefined;
const respondToMentionsOnly = config.respondToMentionsOnly === true;
const slashCommand = (config.slashCommand as string) ?? "/aivena";
if (!appToken)
throw new Error(
"Slack adapter requires appToken (xapp-...) in settings under pi-channels.slack.appToken",
);
if (!botToken)
throw new Error(
"Slack adapter requires botToken (xoxb-...) in settings under pi-channels.slack.botToken",
);
let socketClient: SocketModeClient | null = null;
const webClient = new WebClient(botToken);
let botUserId: string | null = null;
// ── Helpers ─────────────────────────────────────────────
function isAllowed(channelId: string): boolean {
if (!allowedChannelIds || allowedChannelIds.length === 0) return true;
return allowedChannelIds.includes(channelId);
}
/** Strip the bot's own @mention from message text */
function stripBotMention(text: string): string {
if (!botUserId) return text;
// Slack formats mentions as <@U12345>
return text.replace(new RegExp(`<@${botUserId}>\\s*`, "g"), "").trim();
}
/** Build metadata common to all incoming messages */
function buildMetadata(
event: {
channel?: string;
user?: string;
ts?: string;
thread_ts?: string;
channel_type?: string;
},
extra?: Record<string, unknown>,
): Record<string, unknown> {
return {
channelId: event.channel,
userId: event.user,
timestamp: event.ts,
threadTs: event.thread_ts,
channelType: event.channel_type,
...extra,
};
}
// ── Sending ─────────────────────────────────────────────
async function sendSlack(
channelId: string,
text: string,
threadTs?: string,
): Promise<void> {
await webClient.chat.postMessage({
channel: channelId,
text,
thread_ts: threadTs,
// Unfurl links/media is off by default to keep responses clean
unfurl_links: false,
unfurl_media: false,
});
}
// ── Adapter ─────────────────────────────────────────────
return {
direction: "bidirectional" as const,
async sendTyping(_recipient: string): Promise<void> {
// Slack doesn't have a direct "typing" API for bots in channels.
// We can use a reaction or simply no-op. For DMs, there's no API either.
// Best we can do is nothing — Slack bots don't show typing indicators.
},
async send(message: ChannelMessage): Promise<void> {
const prefix = message.source ? `*[${message.source}]*\n` : "";
const full = prefix + message.text;
const threadTs = message.metadata?.threadTs as string | undefined;
if (full.length <= MAX_LENGTH) {
await sendSlack(message.recipient, full, threadTs);
return;
}
// Split long messages at newlines
let remaining = full;
while (remaining.length > 0) {
if (remaining.length <= MAX_LENGTH) {
await sendSlack(message.recipient, remaining, threadTs);
break;
}
let splitAt = remaining.lastIndexOf("\n", MAX_LENGTH);
if (splitAt < MAX_LENGTH / 2) splitAt = MAX_LENGTH;
await sendSlack(
message.recipient,
remaining.slice(0, splitAt),
threadTs,
);
remaining = remaining.slice(splitAt).replace(/^\n/, "");
}
},
async start(onMessage: OnIncomingMessage): Promise<void> {
if (socketClient) return;
// Resolve bot user ID (for stripping self-mentions)
try {
const authResult = await webClient.auth.test();
botUserId = (authResult.user_id as string) ?? null;
} catch {
// Non-fatal — mention stripping just won't work
}
socketClient = new SocketModeClient({
appToken,
// Suppress noisy internal logging
logLevel: "ERROR" as any,
});
// ── Message events ──────────────────────────────
// Socket Mode wraps events in envelopes. The client emits
// typed events: 'message', 'app_mention', 'slash_commands', etc.
// Each handler receives { event, body, ack, ... }
socketClient.on(
"message",
async ({
event,
ack,
}: {
event: SlackMessageEvent;
ack: () => Promise<void>;
}) => {
try {
await ack();
// Ignore bot messages (including our own)
if (event.bot_id || event.subtype === "bot_message") return;
// Ignore message_changed, message_deleted, etc.
if (event.subtype) return;
if (!event.text) return;
if (!isAllowed(event.channel)) return;
// Skip messages that @mention the bot in channels/groups — these are
// handled by the app_mention listener to avoid duplicate responses.
// DMs (im) and multi-party DMs (mpim) don't fire app_mention, so we
// must NOT skip those here.
if (
botUserId &&
(event.channel_type === "channel" ||
event.channel_type === "group") &&
event.text.includes(`<@${botUserId}>`)
)
return;
// In channels/groups, optionally only respond to @mentions
// (app_mention events are handled separately below)
if (
respondToMentionsOnly &&
(event.channel_type === "channel" ||
event.channel_type === "group")
)
return;
// Use channel:threadTs as sender key for threaded conversations
const sender = event.thread_ts
? `${event.channel}:${event.thread_ts}`
: event.channel;
onMessage({
adapter: "slack",
sender,
text: stripBotMention(event.text),
metadata: buildMetadata(event, {
eventType: "message",
}),
});
} catch (err) {
log?.(
"slack-handler-error",
{ handler: "message", error: String(err) },
"ERROR",
);
}
},
);
// ── App mention events ──────────────────────────
socketClient.on(
"app_mention",
async ({
event,
ack,
}: {
event: SlackMentionEvent;
ack: () => Promise<void>;
}) => {
try {
await ack();
if (!isAllowed(event.channel)) return;
const sender = event.thread_ts
? `${event.channel}:${event.thread_ts}`
: event.channel;
onMessage({
adapter: "slack",
sender,
text: stripBotMention(event.text),
metadata: buildMetadata(event, {
eventType: "app_mention",
}),
});
} catch (err) {
log?.(
"slack-handler-error",
{ handler: "app_mention", error: String(err) },
"ERROR",
);
}
},
);
// ── Slash commands ───────────────────────────────
socketClient.on(
"slash_commands",
async ({
body,
ack,
}: {
body: SlackCommandPayload;
ack: (response?: any) => Promise<void>;
}) => {
try {
if (body.command !== slashCommand) {
await ack();
return;
}
if (!body.text?.trim()) {
await ack({ text: `Usage: ${slashCommand} [your message]` });
return;
}
if (!isAllowed(body.channel_id)) {
await ack({
text: "⛔ This command is not available in this channel.",
});
return;
}
// Acknowledge immediately (Slack requires <3s response)
await ack({ text: "🤔 Thinking..." });
onMessage({
adapter: "slack",
sender: body.channel_id,
text: body.text.trim(),
metadata: {
channelId: body.channel_id,
channelName: body.channel_name,
userId: body.user_id,
userName: body.user_name,
eventType: "slash_command",
command: body.command,
},
});
} catch (err) {
log?.(
"slack-handler-error",
{ handler: "slash_commands", error: String(err) },
"ERROR",
);
}
},
);
// ── Interactive payloads (future: button clicks, modals) ──
socketClient.on(
"interactive",
async ({
body: _body,
ack,
}: {
body: any;
ack: () => Promise<void>;
}) => {
try {
await ack();
// TODO: handle interactive payloads (block actions, modals)
} catch (err) {
log?.(
"slack-handler-error",
{ handler: "interactive", error: String(err) },
"ERROR",
);
}
},
);
await socketClient.start();
},
async stop(): Promise<void> {
if (socketClient) {
await socketClient.disconnect();
socketClient = null;
}
},
};
}

View file

@ -0,0 +1,783 @@
/**
* pi-channels Built-in Telegram adapter (bidirectional).
*
* Outgoing: Telegram Bot API sendMessage.
* Incoming: Long-polling via getUpdates.
*
* Supports:
* - Text messages
* - Photos (downloaded temp file passed as image attachment)
* - Documents (text files downloaded content included in message)
* - Voice messages (downloaded transcribed passed as text)
* - Audio files (music/recordings transcribed passed as text)
* - Audio documents (files with audio MIME routed through transcription)
* - File size validation (1MB for docs/photos, 10MB for voice/audio)
* - MIME type filtering (text-like files only for documents)
*
* Config (in settings.json under pi-channels.adapters.telegram):
* {
* "type": "telegram",
* "botToken": "your-telegram-bot-token",
* "parseMode": "Markdown",
* "polling": true,
* "pollingTimeout": 30,
* "allowedChatIds": ["123456789", "-100987654321"]
* }
*/
import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";
import type {
AdapterConfig,
ChannelAdapter,
ChannelMessage,
IncomingAttachment,
IncomingMessage,
OnIncomingMessage,
TranscriptionConfig,
} from "../types.js";
import {
createTranscriptionProvider,
type TranscriptionProvider,
} from "./transcription.js";
const MAX_LENGTH = 4096;
const MAX_FILE_SIZE = 1_048_576; // 1MB
const MAX_AUDIO_SIZE = 10_485_760; // 10MB — voice/audio files are larger
/** MIME types we treat as text documents (content inlined into the prompt). */
const TEXT_MIME_TYPES = new Set([
"text/plain",
"text/markdown",
"text/csv",
"text/html",
"text/xml",
"text/css",
"text/javascript",
"application/json",
"application/xml",
"application/javascript",
"application/typescript",
"application/x-yaml",
"application/x-toml",
"application/x-sh",
]);
/** File extensions we treat as text even if MIME is generic (application/octet-stream). */
const TEXT_EXTENSIONS = new Set([
".md",
".markdown",
".txt",
".csv",
".json",
".jsonl",
".yaml",
".yml",
".toml",
".xml",
".html",
".htm",
".css",
".js",
".ts",
".tsx",
".jsx",
".py",
".rs",
".go",
".rb",
".php",
".java",
".kt",
".c",
".cpp",
".h",
".sh",
".bash",
".zsh",
".fish",
".sql",
".graphql",
".gql",
".env",
".ini",
".cfg",
".conf",
".properties",
".log",
".gitignore",
".dockerignore",
".editorconfig",
]);
/** Image MIME prefixes. */
function isImageMime(mime: string | undefined): boolean {
if (!mime) return false;
return mime.startsWith("image/");
}
/** Audio MIME types that can be transcribed. */
const AUDIO_MIME_PREFIXES = ["audio/"];
const AUDIO_MIME_TYPES = new Set([
"audio/mpeg",
"audio/mp4",
"audio/ogg",
"audio/wav",
"audio/webm",
"audio/x-m4a",
"audio/flac",
"audio/aac",
"audio/mp3",
"video/ogg", // .ogg containers can be audio-only
]);
function isAudioMime(mime: string | undefined): boolean {
if (!mime) return false;
if (AUDIO_MIME_TYPES.has(mime)) return true;
return AUDIO_MIME_PREFIXES.some((p) => mime.startsWith(p));
}
function isTextDocument(
mimeType: string | undefined,
filename: string | undefined,
): boolean {
if (mimeType && TEXT_MIME_TYPES.has(mimeType)) return true;
if (filename) {
const ext = path.extname(filename).toLowerCase();
if (TEXT_EXTENSIONS.has(ext)) return true;
}
return false;
}
export function createTelegramAdapter(config: AdapterConfig): ChannelAdapter {
const botToken = config.botToken as string;
const parseMode = config.parseMode as string | undefined;
const pollingEnabled = config.polling === true;
const pollingTimeout = (config.pollingTimeout as number) ?? 30;
const allowedChatIds = config.allowedChatIds as string[] | undefined;
if (!botToken) {
throw new Error("Telegram adapter requires botToken");
}
// ── Transcription setup ─────────────────────────────────
const transcriptionConfig = config.transcription as
| TranscriptionConfig
| undefined;
let transcriber: TranscriptionProvider | null = null;
let transcriberError: string | null = null;
if (transcriptionConfig?.enabled) {
try {
transcriber = createTranscriptionProvider(transcriptionConfig);
} catch (err: any) {
transcriberError = err.message ?? "Unknown transcription config error";
console.error(
`[pi-channels] Transcription config error: ${transcriberError}`,
);
}
}
const apiBase = `https://api.telegram.org/bot${botToken}`;
let offset = 0;
let running = false;
let abortController: AbortController | null = null;
// Track temp files for cleanup
const tempFiles: string[] = [];
// ── Telegram API helpers ────────────────────────────────
async function sendTelegram(chatId: string, text: string): Promise<void> {
const body: Record<string, unknown> = { chat_id: chatId, text };
if (parseMode) body.parse_mode = parseMode;
const res = await fetch(`${apiBase}/sendMessage`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text().catch(() => "unknown error");
throw new Error(`Telegram API error ${res.status}: ${err}`);
}
}
async function sendChatAction(
chatId: string,
action = "typing",
): Promise<void> {
try {
await fetch(`${apiBase}/sendChatAction`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ chat_id: chatId, action }),
});
} catch {
// Best-effort
}
}
/**
* Download a file from Telegram by file_id.
* Returns { path, size } or null on failure.
*/
async function downloadFile(
fileId: string,
suggestedName?: string,
maxSize = MAX_FILE_SIZE,
): Promise<{ localPath: string; size: number } | null> {
try {
// Get file info
const infoRes = await fetch(`${apiBase}/getFile?file_id=${fileId}`);
if (!infoRes.ok) return null;
const info = (await infoRes.json()) as {
ok: boolean;
result?: { file_id: string; file_size?: number; file_path?: string };
};
if (!info.ok || !info.result?.file_path) return null;
const fileSize = info.result.file_size ?? 0;
// Size check before downloading
if (fileSize > maxSize) return null;
// Download
const fileUrl = `https://api.telegram.org/file/bot${botToken}/${info.result.file_path}`;
const fileRes = await fetch(fileUrl);
if (!fileRes.ok) return null;
const buffer = Buffer.from(await fileRes.arrayBuffer());
// Double-check size after download
if (buffer.length > maxSize) return null;
// Write to temp file
const ext =
path.extname(info.result.file_path) ||
path.extname(suggestedName || "") ||
"";
const tmpDir = path.join(os.tmpdir(), "pi-channels");
fs.mkdirSync(tmpDir, { recursive: true });
const localPath = path.join(
tmpDir,
`${Date.now()}-${Math.random().toString(36).slice(2)}${ext}`,
);
fs.writeFileSync(localPath, buffer);
tempFiles.push(localPath);
return { localPath, size: buffer.length };
} catch {
return null;
}
}
// ── Message building helpers ────────────────────────────
function buildBaseMetadata(msg: TelegramMessage): Record<string, unknown> {
return {
messageId: msg.message_id,
chatType: msg.chat.type,
chatTitle: msg.chat.title,
userId: msg.from?.id,
username: msg.from?.username,
firstName: msg.from?.first_name,
date: msg.date,
};
}
// ── Incoming (long polling) ─────────────────────────────
async function poll(onMessage: OnIncomingMessage): Promise<void> {
while (running) {
try {
abortController = new AbortController();
const url = `${apiBase}/getUpdates?offset=${offset}&timeout=${pollingTimeout}&allowed_updates=["message"]`;
const res = await fetch(url, {
signal: abortController.signal,
});
if (!res.ok) {
await sleep(5000);
continue;
}
const data = (await res.json()) as {
ok: boolean;
result: Array<{ update_id: number; message?: TelegramMessage }>;
};
if (!data.ok || !data.result?.length) continue;
for (const update of data.result) {
offset = update.update_id + 1;
const msg = update.message;
if (!msg) continue;
const chatId = String(msg.chat.id);
if (allowedChatIds && !allowedChatIds.includes(chatId)) continue;
const incoming = await processMessage(msg, chatId);
if (incoming) onMessage(incoming);
}
} catch (err: any) {
if (err.name === "AbortError") break;
if (running) await sleep(5000);
}
}
}
/**
* Process a single Telegram message into an IncomingMessage.
* Handles text, photos, and documents.
*/
async function processMessage(
msg: TelegramMessage,
chatId: string,
): Promise<IncomingMessage | null> {
const metadata = buildBaseMetadata(msg);
const caption = msg.caption || "";
// ── Photo ──────────────────────────────────────────
if (msg.photo && msg.photo.length > 0) {
// Pick the largest photo (last in array)
const largest = msg.photo[msg.photo.length - 1];
// Size check
if (largest.file_size && largest.file_size > MAX_FILE_SIZE) {
return {
adapter: "telegram",
sender: chatId,
text: "⚠️ Photo too large (max 1MB).",
metadata: { ...metadata, rejected: true },
};
}
const downloaded = await downloadFile(largest.file_id, "photo.jpg");
if (!downloaded) {
return {
adapter: "telegram",
sender: chatId,
text: caption || "📷 (photo — failed to download)",
metadata,
};
}
const attachment: IncomingAttachment = {
type: "image",
path: downloaded.localPath,
filename: "photo.jpg",
mimeType: "image/jpeg",
size: downloaded.size,
};
return {
adapter: "telegram",
sender: chatId,
text: caption || "Describe this image.",
attachments: [attachment],
metadata: { ...metadata, hasPhoto: true },
};
}
// ── Document ───────────────────────────────────────
if (msg.document) {
const doc = msg.document;
const mimeType = doc.mime_type;
const filename = doc.file_name;
// Size check
if (doc.file_size && doc.file_size > MAX_FILE_SIZE) {
return {
adapter: "telegram",
sender: chatId,
text: `⚠️ File too large: ${filename || "document"} (${formatSize(doc.file_size)}, max 1MB).`,
metadata: { ...metadata, rejected: true },
};
}
// Image documents (e.g. uncompressed photos sent as files)
if (isImageMime(mimeType)) {
const downloaded = await downloadFile(doc.file_id, filename);
if (!downloaded) {
return {
adapter: "telegram",
sender: chatId,
text: caption || `📎 ${filename || "image"} (failed to download)`,
metadata,
};
}
const attachment: IncomingAttachment = {
type: "image",
path: downloaded.localPath,
filename: filename || "image",
mimeType: mimeType || "image/jpeg",
size: downloaded.size,
};
return {
adapter: "telegram",
sender: chatId,
text: caption || "Describe this image.",
attachments: [attachment],
metadata: { ...metadata, hasDocument: true, documentType: "image" },
};
}
// Text documents — download and inline content
if (isTextDocument(mimeType, filename)) {
const downloaded = await downloadFile(doc.file_id, filename);
if (!downloaded) {
return {
adapter: "telegram",
sender: chatId,
text:
caption || `📎 ${filename || "document"} (failed to download)`,
metadata,
};
}
const attachment: IncomingAttachment = {
type: "document",
path: downloaded.localPath,
filename: filename || "document",
mimeType: mimeType || "text/plain",
size: downloaded.size,
};
return {
adapter: "telegram",
sender: chatId,
text: caption || `Here is the file ${filename || "document"}.`,
attachments: [attachment],
metadata: { ...metadata, hasDocument: true, documentType: "text" },
};
}
// Audio documents — route through transcription
if (isAudioMime(mimeType)) {
if (!transcriber) {
return {
adapter: "telegram",
sender: chatId,
text: transcriberError
? `⚠️ Audio transcription misconfigured: ${transcriberError}`
: `⚠️ Audio files are not supported. Please type your message.`,
metadata: { ...metadata, rejected: true, hasAudio: true },
};
}
if (doc.file_size && doc.file_size > MAX_AUDIO_SIZE) {
return {
adapter: "telegram",
sender: chatId,
text: `⚠️ Audio file too large: ${filename || "audio"} (${formatSize(doc.file_size)}, max 10MB).`,
metadata: { ...metadata, rejected: true, hasAudio: true },
};
}
const downloaded = await downloadFile(
doc.file_id,
filename,
MAX_AUDIO_SIZE,
);
if (!downloaded) {
return {
adapter: "telegram",
sender: chatId,
text: caption || `🎵 ${filename || "audio"} (failed to download)`,
metadata: { ...metadata, hasAudio: true },
};
}
const result = await transcriber.transcribe(downloaded.localPath);
if (!result.ok || !result.text) {
return {
adapter: "telegram",
sender: chatId,
text: `🎵 ${filename || "audio"} (transcription failed${result.error ? `: ${result.error}` : ""})`,
metadata: { ...metadata, hasAudio: true },
};
}
const label = filename ? `Audio: ${filename}` : "Audio file";
return {
adapter: "telegram",
sender: chatId,
text: `🎵 [${label}]: ${result.text}`,
metadata: { ...metadata, hasAudio: true, audioTitle: filename },
};
}
// Unsupported file type
return {
adapter: "telegram",
sender: chatId,
text: `⚠️ Unsupported file type: ${filename || "document"} (${mimeType || "unknown"}). I can handle text files, images, and audio.`,
metadata: { ...metadata, rejected: true },
};
}
// ── Voice message ──────────────────────────────────
if (msg.voice) {
const voice = msg.voice;
if (!transcriber) {
return {
adapter: "telegram",
sender: chatId,
text: transcriberError
? `⚠️ Voice transcription misconfigured: ${transcriberError}`
: "⚠️ Voice messages are not supported. Please type your message.",
metadata: { ...metadata, rejected: true, hasVoice: true },
};
}
// Size check
if (voice.file_size && voice.file_size > MAX_AUDIO_SIZE) {
return {
adapter: "telegram",
sender: chatId,
text: `⚠️ Voice message too large (${formatSize(voice.file_size)}, max 10MB).`,
metadata: { ...metadata, rejected: true, hasVoice: true },
};
}
const downloaded = await downloadFile(
voice.file_id,
"voice.ogg",
MAX_AUDIO_SIZE,
);
if (!downloaded) {
return {
adapter: "telegram",
sender: chatId,
text: "🎤 (voice message — failed to download)",
metadata: { ...metadata, hasVoice: true },
};
}
const result = await transcriber.transcribe(downloaded.localPath);
if (!result.ok || !result.text) {
return {
adapter: "telegram",
sender: chatId,
text: `🎤 (voice message — transcription failed${result.error ? `: ${result.error}` : ""})`,
metadata: {
...metadata,
hasVoice: true,
voiceDuration: voice.duration,
},
};
}
return {
adapter: "telegram",
sender: chatId,
text: `🎤 [Voice message]: ${result.text}`,
metadata: {
...metadata,
hasVoice: true,
voiceDuration: voice.duration,
},
};
}
// ── Audio file (sent as music) ─────────────────────
if (msg.audio) {
const audio = msg.audio;
if (!transcriber) {
return {
adapter: "telegram",
sender: chatId,
text: transcriberError
? `⚠️ Audio transcription misconfigured: ${transcriberError}`
: "⚠️ Audio files are not supported. Please type your message.",
metadata: { ...metadata, rejected: true, hasAudio: true },
};
}
if (audio.file_size && audio.file_size > MAX_AUDIO_SIZE) {
return {
adapter: "telegram",
sender: chatId,
text: `⚠️ Audio too large (${formatSize(audio.file_size)}, max 10MB).`,
metadata: { ...metadata, rejected: true, hasAudio: true },
};
}
const audioName = audio.title || audio.performer || "audio";
const downloaded = await downloadFile(
audio.file_id,
`${audioName}.mp3`,
MAX_AUDIO_SIZE,
);
if (!downloaded) {
return {
adapter: "telegram",
sender: chatId,
text: caption || `🎵 ${audioName} (failed to download)`,
metadata: { ...metadata, hasAudio: true },
};
}
const result = await transcriber.transcribe(downloaded.localPath);
if (!result.ok || !result.text) {
return {
adapter: "telegram",
sender: chatId,
text: `🎵 ${audioName} (transcription failed${result.error ? `: ${result.error}` : ""})`,
metadata: {
...metadata,
hasAudio: true,
audioTitle: audio.title,
audioDuration: audio.duration,
},
};
}
const label = audio.title
? `Audio: ${audio.title}${audio.performer ? ` by ${audio.performer}` : ""}`
: "Audio";
return {
adapter: "telegram",
sender: chatId,
text: `🎵 [${label}]: ${result.text}`,
metadata: {
...metadata,
hasAudio: true,
audioTitle: audio.title,
audioDuration: audio.duration,
},
};
}
// ── Text ───────────────────────────────────────────
if (msg.text) {
return {
adapter: "telegram",
sender: chatId,
text: msg.text,
metadata,
};
}
// Unsupported message type (sticker, video, etc.) — ignore
return null;
}
// ── Cleanup ─────────────────────────────────────────────
function cleanupTempFiles(): void {
for (const f of tempFiles) {
try {
fs.unlinkSync(f);
} catch {
/* ignore */
}
}
tempFiles.length = 0;
}
// ── Adapter ─────────────────────────────────────────────
return {
direction: "bidirectional" as const,
async sendTyping(recipient: string): Promise<void> {
await sendChatAction(recipient, "typing");
},
async send(message: ChannelMessage): Promise<void> {
const prefix = message.source ? `[${message.source}]\n` : "";
const full = prefix + message.text;
if (full.length <= MAX_LENGTH) {
await sendTelegram(message.recipient, full);
return;
}
// Split long messages at newlines
let remaining = full;
while (remaining.length > 0) {
if (remaining.length <= MAX_LENGTH) {
await sendTelegram(message.recipient, remaining);
break;
}
let splitAt = remaining.lastIndexOf("\n", MAX_LENGTH);
if (splitAt < MAX_LENGTH / 2) splitAt = MAX_LENGTH;
await sendTelegram(message.recipient, remaining.slice(0, splitAt));
remaining = remaining.slice(splitAt).replace(/^\n/, "");
}
},
async start(onMessage: OnIncomingMessage): Promise<void> {
if (!pollingEnabled) return;
if (running) return;
running = true;
poll(onMessage);
},
async stop(): Promise<void> {
running = false;
abortController?.abort();
abortController = null;
cleanupTempFiles();
},
};
}
// ── Telegram API types (subset) ─────────────────────────────────
interface TelegramMessage {
message_id: number;
from?: { id: number; username?: string; first_name?: string };
chat: { id: number; type: string; title?: string };
date: number;
text?: string;
caption?: string;
photo?: Array<{
file_id: string;
file_unique_id: string;
width: number;
height: number;
file_size?: number;
}>;
document?: {
file_id: string;
file_unique_id: string;
file_name?: string;
mime_type?: string;
file_size?: number;
};
voice?: {
file_id: string;
file_unique_id: string;
duration: number;
mime_type?: string;
file_size?: number;
};
audio?: {
file_id: string;
file_unique_id: string;
duration: number;
performer?: string;
title?: string;
mime_type?: string;
file_size?: number;
};
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function formatSize(bytes: number): string {
if (bytes < 1024) return `${bytes}B`;
if (bytes < 1_048_576) return `${(bytes / 1024).toFixed(1)}KB`;
return `${(bytes / 1_048_576).toFixed(1)}MB`;
}

Binary file not shown.

View file

@ -0,0 +1,101 @@
/// transcribe-apple macOS speech-to-text via SFSpeechRecognizer.
///
/// Usage: transcribe-apple <audio-file> [language-code]
/// Prints transcribed text to stdout. Exits 1 on error (message to stderr).
import Foundation
import Speech
guard CommandLine.arguments.count >= 2 else {
FileHandle.standardError.write("Usage: transcribe-apple <audio-file> [language-code]\n".data(using: .utf8)!)
exit(1)
}
let filePath = CommandLine.arguments[1]
let languageCode = CommandLine.arguments.count >= 3 ? CommandLine.arguments[2] : "en-US"
// Normalize short language codes (e.g. "en" "en-US", "no" "nb-NO")
func normalizeLocale(_ code: String) -> Locale {
let mapping: [String: String] = [
"en": "en-US", "no": "nb-NO", "nb": "nb-NO", "nn": "nn-NO",
"sv": "sv-SE", "da": "da-DK", "de": "de-DE", "fr": "fr-FR",
"es": "es-ES", "it": "it-IT", "pt": "pt-BR", "ja": "ja-JP",
"ko": "ko-KR", "zh": "zh-CN", "ru": "ru-RU", "ar": "ar-SA",
"hi": "hi-IN", "pl": "pl-PL", "nl": "nl-NL", "fi": "fi-FI",
]
let resolved = mapping[code] ?? code
return Locale(identifier: resolved)
}
let locale = normalizeLocale(languageCode)
let fileURL = URL(fileURLWithPath: filePath)
guard FileManager.default.fileExists(atPath: filePath) else {
FileHandle.standardError.write("File not found: \(filePath)\n".data(using: .utf8)!)
exit(1)
}
guard let recognizer = SFSpeechRecognizer(locale: locale) else {
FileHandle.standardError.write("Speech recognizer not available for locale: \(locale.identifier)\n".data(using: .utf8)!)
exit(1)
}
guard recognizer.isAvailable else {
FileHandle.standardError.write("Speech recognizer not available (offline model may need download)\n".data(using: .utf8)!)
exit(1)
}
// Request authorization (needed even for on-device recognition)
let semaphore = DispatchSemaphore(value: 0)
var authStatus: SFSpeechRecognizerAuthorizationStatus = .notDetermined
SFSpeechRecognizer.requestAuthorization { status in
authStatus = status
semaphore.signal()
}
semaphore.wait()
guard authStatus == .authorized else {
FileHandle.standardError.write("Speech recognition not authorized (status: \(authStatus.rawValue)). Grant access in System Settings > Privacy > Speech Recognition.\n".data(using: .utf8)!)
exit(1)
}
// Perform recognition
let request = SFSpeechURLRecognitionRequest(url: fileURL)
request.requiresOnDeviceRecognition = true
request.shouldReportPartialResults = false
let resultSemaphore = DispatchSemaphore(value: 0)
var transcribedText: String?
var recognitionError: Error?
recognizer.recognitionTask(with: request) { result, error in
if let error = error {
recognitionError = error
resultSemaphore.signal()
return
}
if let result = result, result.isFinal {
transcribedText = result.bestTranscription.formattedString
resultSemaphore.signal()
}
}
// Wait up to 60 seconds
let timeout = resultSemaphore.wait(timeout: .now() + 60)
if timeout == .timedOut {
FileHandle.standardError.write("Transcription timed out after 60 seconds\n".data(using: .utf8)!)
exit(1)
}
if let error = recognitionError {
FileHandle.standardError.write("Recognition error: \(error.localizedDescription)\n".data(using: .utf8)!)
exit(1)
}
guard let text = transcribedText, !text.isEmpty else {
FileHandle.standardError.write("No speech detected in audio\n".data(using: .utf8)!)
exit(1)
}
print(text)

View file

@ -0,0 +1,299 @@
/**
* pi-channels Pluggable audio transcription.
*
* Supports three providers:
* - "apple" macOS SFSpeechRecognizer (free, offline, no API key)
* - "openai" Whisper API
* - "elevenlabs" Scribe API
*
* Usage:
* const provider = createTranscriptionProvider(config);
* const result = await provider.transcribe("/path/to/audio.ogg", "en");
*/
import { execFile } from "node:child_process";
import * as fs from "node:fs";
import * as path from "node:path";
import type { TranscriptionConfig } from "../types.js";
// ── Public interface ────────────────────────────────────────────
export interface TranscriptionResult {
ok: boolean;
text?: string;
error?: string;
}
export interface TranscriptionProvider {
transcribe(filePath: string, language?: string): Promise<TranscriptionResult>;
}
/** Create a transcription provider from config. */
export function createTranscriptionProvider(
config: TranscriptionConfig,
): TranscriptionProvider {
switch (config.provider) {
case "apple":
return new AppleProvider(config);
case "openai":
return new OpenAIProvider(config);
case "elevenlabs":
return new ElevenLabsProvider(config);
default:
throw new Error(`Unknown transcription provider: ${config.provider}`);
}
}
// ── Helpers ─────────────────────────────────────────────────────
/** Resolve "env:VAR_NAME" patterns to actual environment variable values. */
function resolveEnvValue(value: string | undefined): string | undefined {
if (!value) return undefined;
if (value.startsWith("env:")) {
const envVar = value.slice(4);
return process.env[envVar] || undefined;
}
return value;
}
function validateFile(filePath: string): TranscriptionResult | null {
if (!fs.existsSync(filePath)) {
return { ok: false, error: `File not found: ${filePath}` };
}
const stat = fs.statSync(filePath);
// 25MB limit (Whisper max; Telegram max is 20MB)
if (stat.size > 25 * 1024 * 1024) {
return {
ok: false,
error: `File too large: ${(stat.size / 1024 / 1024).toFixed(1)}MB (max 25MB)`,
};
}
if (stat.size === 0) {
return { ok: false, error: "File is empty" };
}
return null;
}
// ── Apple Provider ──────────────────────────────────────────────
const SWIFT_HELPER_SRC = path.join(
import.meta.dirname,
"transcribe-apple.swift",
);
const SWIFT_HELPER_BIN = path.join(import.meta.dirname, "transcribe-apple");
class AppleProvider implements TranscriptionProvider {
private language: string | undefined;
private compilePromise: Promise<TranscriptionResult> | null = null;
constructor(config: TranscriptionConfig) {
this.language = config.language;
}
async transcribe(
filePath: string,
language?: string,
): Promise<TranscriptionResult> {
if (process.platform !== "darwin") {
return {
ok: false,
error: "Apple transcription is only available on macOS",
};
}
const fileErr = validateFile(filePath);
if (fileErr) return fileErr;
// Compile Swift helper on first use (promise-based lock prevents races)
if (!this.compilePromise) {
this.compilePromise = this.compileHelper();
}
const compileResult = await this.compilePromise;
if (!compileResult.ok) return compileResult;
const lang = language || this.language;
const args = [filePath];
if (lang) args.push(lang);
return new Promise((resolve) => {
execFile(
SWIFT_HELPER_BIN,
args,
{ timeout: 60_000 },
(err, stdout, stderr) => {
if (err) {
resolve({ ok: false, error: stderr?.trim() || err.message });
return;
}
const text = stdout.trim();
if (!text) {
resolve({
ok: false,
error: "Transcription returned empty result",
});
return;
}
resolve({ ok: true, text });
},
);
});
}
private compileHelper(): Promise<TranscriptionResult> {
// Skip if already compiled and binary exists
if (fs.existsSync(SWIFT_HELPER_BIN)) {
return Promise.resolve({ ok: true });
}
if (!fs.existsSync(SWIFT_HELPER_SRC)) {
return Promise.resolve({
ok: false,
error: `Swift helper source not found: ${SWIFT_HELPER_SRC}`,
});
}
return new Promise((resolve) => {
execFile(
"swiftc",
["-O", "-o", SWIFT_HELPER_BIN, SWIFT_HELPER_SRC],
{ timeout: 30_000 },
(err, _stdout, stderr) => {
if (err) {
resolve({
ok: false,
error: `Failed to compile Swift helper: ${stderr?.trim() || err.message}`,
});
return;
}
resolve({ ok: true });
},
);
});
}
}
// ── OpenAI Provider ─────────────────────────────────────────────
class OpenAIProvider implements TranscriptionProvider {
private apiKey: string;
private model: string;
private language: string | undefined;
constructor(config: TranscriptionConfig) {
const key = resolveEnvValue(config.apiKey);
if (!key) throw new Error("OpenAI transcription requires apiKey");
this.apiKey = key;
this.model = config.model || "whisper-1";
this.language = config.language;
}
async transcribe(
filePath: string,
language?: string,
): Promise<TranscriptionResult> {
const fileErr = validateFile(filePath);
if (fileErr) return fileErr;
const lang = language || this.language;
try {
const form = new FormData();
const fileBuffer = fs.readFileSync(filePath);
const filename = path.basename(filePath);
form.append("file", new Blob([fileBuffer]), filename);
form.append("model", this.model);
if (lang) form.append("language", lang);
const response = await fetch(
"https://api.openai.com/v1/audio/transcriptions",
{
method: "POST",
headers: { Authorization: `Bearer ${this.apiKey}` },
body: form,
},
);
if (!response.ok) {
const body = await response.text();
return {
ok: false,
error: `OpenAI API error (${response.status}): ${body.slice(0, 200)}`,
};
}
const data = (await response.json()) as { text?: string };
if (!data.text) {
return { ok: false, error: "OpenAI returned empty transcription" };
}
return { ok: true, text: data.text };
} catch (err: any) {
return {
ok: false,
error: `OpenAI transcription failed: ${err.message}`,
};
}
}
}
// ── ElevenLabs Provider ─────────────────────────────────────────
class ElevenLabsProvider implements TranscriptionProvider {
private apiKey: string;
private model: string;
private language: string | undefined;
constructor(config: TranscriptionConfig) {
const key = resolveEnvValue(config.apiKey);
if (!key) throw new Error("ElevenLabs transcription requires apiKey");
this.apiKey = key;
this.model = config.model || "scribe_v1";
this.language = config.language;
}
async transcribe(
filePath: string,
language?: string,
): Promise<TranscriptionResult> {
const fileErr = validateFile(filePath);
if (fileErr) return fileErr;
const lang = language || this.language;
try {
const form = new FormData();
const fileBuffer = fs.readFileSync(filePath);
const filename = path.basename(filePath);
form.append("file", new Blob([fileBuffer]), filename);
form.append("model_id", this.model);
if (lang) form.append("language_code", lang);
const response = await fetch(
"https://api.elevenlabs.io/v1/speech-to-text",
{
method: "POST",
headers: { "xi-api-key": this.apiKey },
body: form,
},
);
if (!response.ok) {
const body = await response.text();
return {
ok: false,
error: `ElevenLabs API error (${response.status}): ${body.slice(0, 200)}`,
};
}
const data = (await response.json()) as { text?: string };
if (!data.text) {
return { ok: false, error: "ElevenLabs returned empty transcription" };
}
return { ok: true, text: data.text };
} catch (err: any) {
return {
ok: false,
error: `ElevenLabs transcription failed: ${err.message}`,
};
}
}
}

View file

@ -0,0 +1,45 @@
/**
* pi-channels Built-in webhook adapter.
*
* POSTs message as JSON. The recipient field is the webhook URL.
*
* Config:
* {
* "type": "webhook",
* "method": "POST",
* "headers": { "Authorization": "Bearer ..." }
* }
*/
import type {
AdapterConfig,
ChannelAdapter,
ChannelMessage,
} from "../types.js";
export function createWebhookAdapter(config: AdapterConfig): ChannelAdapter {
const method = (config.method as string) ?? "POST";
const extraHeaders = (config.headers as Record<string, string>) ?? {};
return {
direction: "outgoing" as const,
async send(message: ChannelMessage): Promise<void> {
const res = await fetch(message.recipient, {
method,
headers: { "Content-Type": "application/json", ...extraHeaders },
body: JSON.stringify({
text: message.text,
source: message.source,
metadata: message.metadata,
timestamp: new Date().toISOString(),
}),
});
if (!res.ok) {
const err = await res.text().catch(() => "unknown error");
throw new Error(`Webhook error ${res.status}: ${err}`);
}
},
};
}

View file

@ -0,0 +1,425 @@
/**
* pi-channels Chat bridge.
*
* Listens for incoming messages (channel:receive), serializes per sender,
* routes prompts into the live pi gateway runtime, and sends responses
* back via the same adapter. Each sender gets their own FIFO queue.
* Multiple senders run concurrently up to maxConcurrent.
*/
import { readFileSync } from "node:fs";
import type { ImageContent } from "@mariozechner/pi-ai";
import {
type EventBus,
getActiveGatewayRuntime,
} from "@mariozechner/pi-coding-agent";
import type { ChannelRegistry } from "../registry.js";
import type {
BridgeConfig,
IncomingMessage,
QueuedPrompt,
SenderSession,
} from "../types.js";
import { type CommandContext, handleCommand, isCommand } from "./commands.js";
import { startTyping } from "./typing.js";
const BRIDGE_DEFAULTS: Required<BridgeConfig> = {
enabled: false,
sessionMode: "persistent",
sessionRules: [],
idleTimeoutMinutes: 30,
maxQueuePerSender: 5,
timeoutMs: 300_000,
maxConcurrent: 2,
model: null,
typingIndicators: true,
commands: true,
extensions: [],
};
type LogFn = (event: string, data: unknown, level?: string) => void;
let idCounter = 0;
function nextId(): string {
return `msg-${Date.now()}-${++idCounter}`;
}
export class ChatBridge {
private config: Required<BridgeConfig>;
private registry: ChannelRegistry;
private events: EventBus;
private log: LogFn;
private sessions = new Map<string, SenderSession>();
private activeCount = 0;
private running = false;
constructor(
bridgeConfig: BridgeConfig | undefined,
_cwd: string,
registry: ChannelRegistry,
events: EventBus,
log: LogFn = () => {},
) {
this.config = { ...BRIDGE_DEFAULTS, ...bridgeConfig };
this.registry = registry;
this.events = events;
this.log = log;
}
// ── Lifecycle ─────────────────────────────────────────────
start(): void {
if (this.running) return;
if (!getActiveGatewayRuntime()) {
this.log(
"bridge-unavailable",
{ reason: "no active pi gateway runtime" },
"WARN",
);
return;
}
this.running = true;
}
stop(): void {
this.running = false;
for (const session of this.sessions.values()) {
session.abortController?.abort();
}
this.sessions.clear();
this.activeCount = 0;
}
isActive(): boolean {
return this.running;
}
updateConfig(cfg: BridgeConfig): void {
this.config = { ...BRIDGE_DEFAULTS, ...cfg };
}
// ── Main entry point ──────────────────────────────────────
handleMessage(message: IncomingMessage): void {
if (!this.running) return;
const text = message.text?.trim();
const hasAttachments =
message.attachments && message.attachments.length > 0;
if (!text && !hasAttachments) return;
// Rejected messages (too large, unsupported type) — send back directly
if (message.metadata?.rejected) {
this.sendReply(
message.adapter,
message.sender,
text || "⚠️ Unsupported message.",
);
return;
}
const senderKey = `${message.adapter}:${message.sender}`;
// Get or create session
let session = this.sessions.get(senderKey);
if (!session) {
session = this.createSession(message);
this.sessions.set(senderKey, session);
}
// Bot commands (only for text-only messages)
if (text && !hasAttachments && this.config.commands && isCommand(text)) {
const reply = handleCommand(text, session, this.commandContext());
if (reply !== null) {
this.sendReply(message.adapter, message.sender, reply);
return;
}
// Unrecognized command — fall through to agent
}
// Queue depth check
if (session.queue.length >= this.config.maxQueuePerSender) {
this.sendReply(
message.adapter,
message.sender,
`⚠️ Queue full (${this.config.maxQueuePerSender} pending). ` +
`Wait for current prompts to finish or use /abort.`,
);
return;
}
// Enqueue
const queued: QueuedPrompt = {
id: nextId(),
adapter: message.adapter,
sender: message.sender,
text: text || "Describe this.",
attachments: message.attachments,
metadata: message.metadata,
enqueuedAt: Date.now(),
};
session.queue.push(queued);
session.messageCount++;
this.events.emit("bridge:enqueue", {
id: queued.id,
adapter: message.adapter,
sender: message.sender,
queueDepth: session.queue.length,
});
this.processNext(senderKey);
}
// ── Processing ────────────────────────────────────────────
private async processNext(senderKey: string): Promise<void> {
const session = this.sessions.get(senderKey);
if (!session || session.processing || session.queue.length === 0) return;
if (this.activeCount >= this.config.maxConcurrent) return;
session.processing = true;
this.activeCount++;
const prompt = session.queue.shift()!;
// Typing indicator
const adapter = this.registry.getAdapter(prompt.adapter);
const typing = this.config.typingIndicators
? startTyping(adapter, prompt.sender)
: { stop() {} };
const gateway = getActiveGatewayRuntime();
if (!gateway) {
typing.stop();
session.processing = false;
this.activeCount--;
this.sendReply(
prompt.adapter,
prompt.sender,
"❌ pi gateway is not running.",
);
return;
}
this.events.emit("bridge:start", {
id: prompt.id,
adapter: prompt.adapter,
sender: prompt.sender,
text: prompt.text.slice(0, 100),
persistent: true,
});
try {
session.abortController = new AbortController();
const result = await gateway.enqueueMessage({
sessionKey: senderKey,
text: buildPromptText(prompt),
images: collectImageAttachments(prompt.attachments),
source: "extension",
metadata: prompt.metadata,
});
typing.stop();
if (result.ok) {
this.sendReply(prompt.adapter, prompt.sender, result.response);
} else if (result.error === "Aborted by user") {
this.sendReply(prompt.adapter, prompt.sender, "⏹ Aborted.");
} else {
const userError = sanitizeError(result.error);
this.sendReply(
prompt.adapter,
prompt.sender,
result.response || `${userError}`,
);
}
this.events.emit("bridge:complete", {
id: prompt.id,
adapter: prompt.adapter,
sender: prompt.sender,
ok: result.ok,
persistent: true,
});
this.log(
"bridge-complete",
{
id: prompt.id,
adapter: prompt.adapter,
ok: result.ok,
persistent: true,
},
result.ok ? "INFO" : "WARN",
);
} catch (err: unknown) {
typing.stop();
const message = err instanceof Error ? err.message : String(err);
this.log(
"bridge-error",
{ adapter: prompt.adapter, sender: prompt.sender, error: message },
"ERROR",
);
this.sendReply(
prompt.adapter,
prompt.sender,
`❌ Unexpected error: ${message}`,
);
} finally {
session.abortController = null;
session.processing = false;
this.activeCount--;
if (session.queue.length > 0) this.processNext(senderKey);
this.drainWaiting();
}
}
/** After a slot frees up, check other senders waiting for concurrency. */
private drainWaiting(): void {
if (this.activeCount >= this.config.maxConcurrent) return;
for (const [key, session] of this.sessions) {
if (!session.processing && session.queue.length > 0) {
this.processNext(key);
if (this.activeCount >= this.config.maxConcurrent) break;
}
}
}
// ── Session management ────────────────────────────────────
private createSession(message: IncomingMessage): SenderSession {
return {
adapter: message.adapter,
sender: message.sender,
displayName:
(message.metadata?.firstName as string) ||
(message.metadata?.username as string) ||
message.sender,
queue: [],
processing: false,
abortController: null,
messageCount: 0,
startedAt: Date.now(),
};
}
getStats(): {
active: boolean;
sessions: number;
activePrompts: number;
totalQueued: number;
} {
let totalQueued = 0;
for (const s of this.sessions.values()) totalQueued += s.queue.length;
return {
active: this.running,
sessions: this.sessions.size,
activePrompts: this.activeCount,
totalQueued,
};
}
getSessions(): Map<string, SenderSession> {
return this.sessions;
}
// ── Command context ───────────────────────────────────────
private commandContext(): CommandContext {
const gateway = getActiveGatewayRuntime();
return {
isPersistent: () => true,
abortCurrent: (sender: string): boolean => {
if (!gateway) return false;
for (const [key, session] of this.sessions) {
if (session.sender === sender && session.abortController) {
return gateway.abortSession(key);
}
}
return false;
},
clearQueue: (sender: string): void => {
for (const session of this.sessions.values()) {
if (session.sender === sender) session.queue.length = 0;
}
},
resetSession: (sender: string): void => {
if (!gateway) return;
for (const [key, session] of this.sessions) {
if (session.sender === sender) {
this.sessions.delete(key);
void gateway.resetSession(key);
}
}
},
};
}
// ── Reply ─────────────────────────────────────────────────
private sendReply(adapter: string, recipient: string, text: string): void {
this.registry.send({ adapter, recipient, text });
}
}
const MAX_ERROR_LENGTH = 200;
/**
* Sanitize subprocess error output for end-user display.
* Strips stack traces, extension crash logs, and long technical details.
*/
function sanitizeError(error: string | undefined): string {
if (!error) return "Something went wrong. Please try again.";
// Extract the most meaningful line — skip "Extension error" noise and stack traces
const lines = error.split("\n").filter((l) => l.trim());
// Find the first line that isn't an extension loading error or stack frame
const meaningful = lines.find(
(l) =>
!l.startsWith("Extension error") &&
!l.startsWith(" at ") &&
!l.startsWith("node:") &&
!l.includes("NODE_MODULE_VERSION") &&
!l.includes("compiled against a different") &&
!l.includes("Emitted 'error' event"),
);
const msg = meaningful?.trim() || "Something went wrong. Please try again.";
return msg.length > MAX_ERROR_LENGTH
? `${msg.slice(0, MAX_ERROR_LENGTH)}`
: msg;
}
function collectImageAttachments(
attachments: QueuedPrompt["attachments"],
): ImageContent[] | undefined {
if (!attachments || attachments.length === 0) {
return undefined;
}
const images = attachments
.filter((attachment) => attachment.type === "image")
.map((attachment) => ({
type: "image" as const,
data: readFileSync(attachment.path).toString("base64"),
mimeType: attachment.mimeType || "image/jpeg",
}));
return images.length > 0 ? images : undefined;
}
function buildPromptText(prompt: QueuedPrompt): string {
if (!prompt.attachments || prompt.attachments.length === 0) {
return prompt.text;
}
const attachmentNotes = prompt.attachments
.filter((attachment) => attachment.type !== "image")
.map((attachment) => {
const label = attachment.filename ?? attachment.path;
return `Attachment (${attachment.type}): ${label}`;
});
if (attachmentNotes.length === 0) {
return prompt.text;
}
return `${prompt.text}\n\n${attachmentNotes.join("\n")}`;
}

View file

@ -0,0 +1,135 @@
/**
* pi-channels Bot command handler.
*
* Detects messages starting with / and handles them without routing
* to the agent. Provides built-in commands and a registry for custom ones.
*
* Built-in: /start, /help, /abort, /status, /new
*/
import type { SenderSession } from "../types.js";
export interface BotCommand {
name: string;
description: string;
handler: (
args: string,
session: SenderSession | undefined,
ctx: CommandContext,
) => string | null;
}
export interface CommandContext {
abortCurrent: (sender: string) => boolean;
clearQueue: (sender: string) => void;
resetSession: (sender: string) => void;
/** Check if a given sender is using persistent (RPC) mode. */
isPersistent: (sender: string) => boolean;
}
const commands = new Map<string, BotCommand>();
export function isCommand(text: string): boolean {
return /^\/[a-zA-Z]/.test(text.trim());
}
export function parseCommand(text: string): { command: string; args: string } {
const match = text.trim().match(/^\/([a-zA-Z_]+)(?:@\S+)?\s*(.*)/s);
if (!match) return { command: "", args: "" };
return { command: match[1].toLowerCase(), args: match[2].trim() };
}
export function registerCommand(cmd: BotCommand): void {
commands.set(cmd.name.toLowerCase(), cmd);
}
export function unregisterCommand(name: string): void {
commands.delete(name.toLowerCase());
}
export function getAllCommands(): BotCommand[] {
return [...commands.values()].sort((a, b) => a.name.localeCompare(b.name));
}
/**
* Handle a command. Returns reply text, or null if unrecognized
* (fall through to agent).
*/
export function handleCommand(
text: string,
session: SenderSession | undefined,
ctx: CommandContext,
): string | null {
const { command } = parseCommand(text);
if (!command) return null;
const cmd = commands.get(command);
if (!cmd) return null;
const { args } = parseCommand(text);
return cmd.handler(args, session, ctx);
}
// ── Built-in commands ───────────────────────────────────────────
registerCommand({
name: "start",
description: "Welcome message",
handler: () =>
"👋 Hi! I'm your Pi assistant.\n\n" +
"Send me a message and I'll process it. Use /help to see available commands.",
});
registerCommand({
name: "help",
description: "Show available commands",
handler: () => {
const lines = getAllCommands().map((c) => `/${c.name}${c.description}`);
return `**Available commands:**\n\n${lines.join("\n")}`;
},
});
registerCommand({
name: "abort",
description: "Cancel the current prompt",
handler: (_args, session, ctx) => {
if (!session) return "No active session.";
if (!session.processing) return "Nothing is running right now.";
return ctx.abortCurrent(session.sender)
? "⏹ Aborting current prompt..."
: "Failed to abort — nothing running.";
},
});
registerCommand({
name: "status",
description: "Show session info",
handler: (_args, session, ctx) => {
if (!session) return "No active session. Send a message to start one.";
const persistent = ctx.isPersistent(session.sender);
const uptime = Math.floor((Date.now() - session.startedAt) / 1000);
const mins = Math.floor(uptime / 60);
const secs = uptime % 60;
return [
`**Session Status**`,
`- Mode: ${persistent ? "🔗 Persistent (conversation memory)" : "⚡ Stateless (no memory)"}`,
`- State: ${session.processing ? "⏳ Processing..." : "💤 Idle"}`,
`- Messages: ${session.messageCount}`,
`- Queue: ${session.queue.length} pending`,
`- Uptime: ${mins > 0 ? `${mins}m ${secs}s` : `${secs}s`}`,
].join("\n");
},
});
registerCommand({
name: "new",
description: "Clear queue and start fresh conversation",
handler: (_args, session, ctx) => {
if (!session) return "No active session.";
const persistent = ctx.isPersistent(session.sender);
ctx.abortCurrent(session.sender);
ctx.clearQueue(session.sender);
ctx.resetSession(session.sender);
return persistent
? "🔄 Session reset. Conversation context cleared. Queue cleared."
: "🔄 Session reset. Queue cleared.";
},
});

View file

@ -0,0 +1,441 @@
/**
* pi-channels Persistent RPC session runner.
*
* Maintains a long-lived `pi --mode rpc` subprocess per sender,
* enabling persistent conversation context across messages.
* Falls back to stateless runner if RPC fails to start.
*
* Lifecycle:
* 1. First message from a sender spawns a new RPC subprocess
* 2. Subsequent messages reuse the same subprocess (session persists)
* 3. /new command or idle timeout restarts the session
* 4. Subprocess crash triggers auto-restart on next message
*/
import { type ChildProcess, spawn } from "node:child_process";
import * as readline from "node:readline";
import type { IncomingAttachment, RunResult } from "../types.js";
export interface RpcRunnerOptions {
cwd: string;
model?: string | null;
timeoutMs: number;
extensions?: string[];
}
interface PendingRequest {
resolve: (result: RunResult) => void;
startTime: number;
timer: ReturnType<typeof setTimeout>;
textChunks: string[];
abortHandler?: () => void;
}
/**
* A persistent RPC session for a single sender.
* Wraps a `pi --mode rpc` subprocess.
*/
export class RpcSession {
private child: ChildProcess | null = null;
private rl: readline.Interface | null = null;
private options: RpcRunnerOptions;
private pending: PendingRequest | null = null;
private ready = false;
private startedAt = 0;
private _onStreaming: ((text: string) => void) | null = null;
constructor(options: RpcRunnerOptions) {
this.options = options;
}
/** Spawn the RPC subprocess if not already running. */
async start(): Promise<boolean> {
if (this.child && this.ready) return true;
this.cleanup();
const args = ["--mode", "rpc", "--no-extensions"];
if (this.options.model) args.push("--model", this.options.model);
if (this.options.extensions?.length) {
for (const ext of this.options.extensions) {
args.push("-e", ext);
}
}
try {
this.child = spawn("pi", args, {
cwd: this.options.cwd,
stdio: ["pipe", "pipe", "pipe"],
env: { ...process.env },
});
} catch {
return false;
}
if (!this.child.stdout || !this.child.stdin) {
this.cleanup();
return false;
}
this.rl = readline.createInterface({ input: this.child.stdout });
this.rl.on("line", (line) => this.handleLine(line));
this.child.on("close", () => {
this.ready = false;
// Reject any pending request
if (this.pending) {
const p = this.pending;
this.pending = null;
clearTimeout(p.timer);
const text = p.textChunks.join("");
p.resolve({
ok: false,
response: text || "(session ended)",
error: "RPC subprocess exited unexpectedly",
durationMs: Date.now() - p.startTime,
exitCode: 1,
});
}
this.child = null;
this.rl = null;
});
this.child.on("error", () => {
this.cleanup();
});
this.ready = true;
this.startedAt = Date.now();
return true;
}
/** Send a prompt and collect the full response. */
runPrompt(
prompt: string,
options?: {
signal?: AbortSignal;
attachments?: IncomingAttachment[];
onStreaming?: (text: string) => void;
},
): Promise<RunResult> {
return new Promise((resolve) => {
void (async () => {
// Ensure subprocess is running
if (!this.ready) {
const ok = await this.start();
if (!ok) {
resolve({
ok: false,
response: "",
error: "Failed to start RPC session",
durationMs: 0,
exitCode: 1,
});
return;
}
}
const startTime = Date.now();
this._onStreaming = options?.onStreaming ?? null;
// Timeout
const timer = setTimeout(() => {
if (this.pending) {
const p = this.pending;
this.pending = null;
const text = p.textChunks.join("");
p.resolve({
ok: false,
response: text || "(timed out)",
error: "Timeout",
durationMs: Date.now() - p.startTime,
exitCode: 124,
});
// Kill and restart on next message
this.cleanup();
}
}, this.options.timeoutMs);
this.pending = { resolve, startTime, timer, textChunks: [] };
// Abort handler
const onAbort = () => {
this.sendCommand({ type: "abort" });
};
if (options?.signal) {
if (options.signal.aborted) {
clearTimeout(timer);
this.pending = null;
this.sendCommand({ type: "abort" });
resolve({
ok: false,
response: "(aborted)",
error: "Aborted by user",
durationMs: Date.now() - startTime,
exitCode: 130,
});
return;
}
options.signal.addEventListener("abort", onAbort, { once: true });
this.pending.abortHandler = () =>
options.signal?.removeEventListener("abort", onAbort);
}
// Build prompt command
const cmd: Record<string, unknown> = {
type: "prompt",
message: prompt,
};
// Attach images as base64
if (options?.attachments?.length) {
const images: Array<Record<string, string>> = [];
for (const att of options.attachments) {
if (att.type === "image") {
try {
const fs = await import("node:fs");
const data = fs.readFileSync(att.path).toString("base64");
images.push({
type: "image",
data,
mimeType: att.mimeType || "image/jpeg",
});
} catch {
// Skip unreadable attachments
}
}
}
if (images.length > 0) cmd.images = images;
}
this.sendCommand(cmd);
})();
});
}
/** Request a new session (clear context). */
async newSession(): Promise<void> {
if (this.ready) {
this.sendCommand({ type: "new_session" });
}
}
/** Check if the subprocess is alive. */
isAlive(): boolean {
return this.ready && this.child !== null;
}
/** Get uptime in ms. */
uptime(): number {
return this.ready ? Date.now() - this.startedAt : 0;
}
/** Kill the subprocess. */
cleanup(): void {
this.ready = false;
this._onStreaming = null;
if (this.pending) {
clearTimeout(this.pending.timer);
this.pending.abortHandler?.();
this.pending = null;
}
if (this.rl) {
this.rl.close();
this.rl = null;
}
if (this.child) {
this.child.kill("SIGTERM");
setTimeout(() => {
if (this.child && !this.child.killed) this.child.kill("SIGKILL");
}, 3000);
this.child = null;
}
}
// ── Private ─────────────────────────────────────────────
private sendCommand(cmd: Record<string, unknown>): void {
if (!this.child?.stdin?.writable) return;
this.child.stdin.write(`${JSON.stringify(cmd)}\n`);
}
private handleLine(line: string): void {
let event: Record<string, unknown>;
try {
event = JSON.parse(line);
} catch {
return;
}
const type = event.type as string;
// Streaming text deltas
if (type === "message_update") {
const delta = event.assistantMessageEvent as
| Record<string, unknown>
| undefined;
if (delta?.type === "text_delta" && typeof delta.delta === "string") {
if (this.pending) this.pending.textChunks.push(delta.delta);
if (this._onStreaming) this._onStreaming(delta.delta);
}
}
// Agent finished — resolve the pending promise
if (type === "agent_end") {
if (this.pending) {
const p = this.pending;
this.pending = null;
this._onStreaming = null;
clearTimeout(p.timer);
p.abortHandler?.();
const text = p.textChunks.join("").trim();
p.resolve({
ok: true,
response: text || "(no output)",
durationMs: Date.now() - p.startTime,
exitCode: 0,
});
}
}
// Handle errors in message_update (aborted, error)
if (type === "message_update") {
const delta = event.assistantMessageEvent as
| Record<string, unknown>
| undefined;
if (delta?.type === "done" && delta.reason === "error") {
if (this.pending) {
const p = this.pending;
this.pending = null;
this._onStreaming = null;
clearTimeout(p.timer);
p.abortHandler?.();
const text = p.textChunks.join("").trim();
p.resolve({
ok: false,
response: text || "",
error: "Agent error",
durationMs: Date.now() - p.startTime,
exitCode: 1,
});
}
}
}
// Prompt response (just ack, actual result comes via agent_end)
// Response errors
if (type === "response") {
const success = event.success as boolean;
if (!success && this.pending) {
const p = this.pending;
this.pending = null;
this._onStreaming = null;
clearTimeout(p.timer);
p.abortHandler?.();
p.resolve({
ok: false,
response: "",
error: (event.error as string) || "RPC command failed",
durationMs: Date.now() - p.startTime,
exitCode: 1,
});
}
}
}
}
/**
* Manages RPC sessions across multiple senders.
* Each sender gets their own persistent subprocess.
*/
export class RpcSessionManager {
private sessions = new Map<string, RpcSession>();
private options: RpcRunnerOptions;
private idleTimeoutMs: number;
private idleTimers = new Map<string, ReturnType<typeof setTimeout>>();
constructor(
options: RpcRunnerOptions,
idleTimeoutMs = 30 * 60_000, // 30 min default
) {
this.options = options;
this.idleTimeoutMs = idleTimeoutMs;
}
/** Get or create a session for a sender. */
async getSession(senderKey: string): Promise<RpcSession> {
let session = this.sessions.get(senderKey);
if (session?.isAlive()) {
this.resetIdleTimer(senderKey);
return session;
}
// Clean up dead session
if (session) {
session.cleanup();
this.sessions.delete(senderKey);
}
// Create new
session = new RpcSession(this.options);
const ok = await session.start();
if (!ok) throw new Error("Failed to start RPC session");
this.sessions.set(senderKey, session);
this.resetIdleTimer(senderKey);
return session;
}
/** Reset a sender's session (new conversation). */
async resetSession(senderKey: string): Promise<void> {
const session = this.sessions.get(senderKey);
if (session) {
await session.newSession();
}
}
/** Kill a specific sender's session. */
killSession(senderKey: string): void {
const session = this.sessions.get(senderKey);
if (session) {
session.cleanup();
this.sessions.delete(senderKey);
}
const timer = this.idleTimers.get(senderKey);
if (timer) {
clearTimeout(timer);
this.idleTimers.delete(senderKey);
}
}
/** Kill all sessions. */
killAll(): void {
for (const session of this.sessions.values()) {
session.cleanup();
}
this.sessions.clear();
for (const timer of this.idleTimers.values()) {
clearTimeout(timer);
}
this.idleTimers.clear();
}
/** Get stats. */
getStats(): { activeSessions: number; senders: string[] } {
return {
activeSessions: this.sessions.size,
senders: [...this.sessions.keys()],
};
}
private resetIdleTimer(senderKey: string): void {
const existing = this.idleTimers.get(senderKey);
if (existing) clearTimeout(existing);
const timer = setTimeout(() => {
this.killSession(senderKey);
}, this.idleTimeoutMs);
this.idleTimers.set(senderKey, timer);
}
}

View file

@ -0,0 +1,136 @@
/**
* pi-channels Subprocess runner for the chat bridge.
*
* Spawns `pi -p --no-session [@files...] <prompt>` to process a single prompt.
* Supports file attachments (images, documents) via the @file syntax.
* Same pattern as pi-cron and pi-heartbeat.
*/
import { type ChildProcess, spawn } from "node:child_process";
import type { IncomingAttachment, RunResult } from "../types.js";
export interface RunOptions {
prompt: string;
cwd: string;
timeoutMs: number;
model?: string | null;
signal?: AbortSignal;
/** File attachments to include via @file args. */
attachments?: IncomingAttachment[];
/** Explicit extension paths to load (with --no-extensions + -e for each). */
extensions?: string[];
}
export function runPrompt(options: RunOptions): Promise<RunResult> {
const { prompt, cwd, timeoutMs, model, signal, attachments, extensions } =
options;
return new Promise((resolve) => {
const startTime = Date.now();
const args = ["-p", "--no-session", "--no-extensions"];
if (model) args.push("--model", model);
// Explicitly load only bridge-safe extensions
if (extensions?.length) {
for (const ext of extensions) {
args.push("-e", ext);
}
}
// Add file attachments as @file args before the prompt
if (attachments?.length) {
for (const att of attachments) {
args.push(`@${att.path}`);
}
}
args.push(prompt);
let child: ChildProcess;
try {
child = spawn("pi", args, {
cwd,
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env },
timeout: timeoutMs,
});
} catch (err: any) {
resolve({
ok: false,
response: "",
error: `Failed to spawn: ${err.message}`,
durationMs: Date.now() - startTime,
exitCode: 1,
});
return;
}
let stdout = "";
let stderr = "";
child.stdout?.on("data", (chunk: Buffer) => {
stdout += chunk.toString();
});
child.stderr?.on("data", (chunk: Buffer) => {
stderr += chunk.toString();
});
const onAbort = () => {
child.kill("SIGTERM");
setTimeout(() => {
if (!child.killed) child.kill("SIGKILL");
}, 3000);
};
if (signal) {
if (signal.aborted) {
onAbort();
} else {
signal.addEventListener("abort", onAbort, { once: true });
}
}
child.on("close", (code) => {
signal?.removeEventListener("abort", onAbort);
const durationMs = Date.now() - startTime;
const response = stdout.trim();
const exitCode = code ?? 1;
if (signal?.aborted) {
resolve({
ok: false,
response: response || "(aborted)",
error: "Aborted by user",
durationMs,
exitCode: 130,
});
} else if (exitCode !== 0) {
resolve({
ok: false,
response,
error: stderr.trim() || `Exit code ${exitCode}`,
durationMs,
exitCode,
});
} else {
resolve({
ok: true,
response: response || "(no output)",
durationMs,
exitCode: 0,
});
}
});
child.on("error", (err) => {
signal?.removeEventListener("abort", onAbort);
resolve({
ok: false,
response: "",
error: err.message,
durationMs: Date.now() - startTime,
exitCode: 1,
});
});
});
}

View file

@ -0,0 +1,35 @@
/**
* pi-channels Typing indicator manager.
*
* Sends periodic typing chat actions via the adapter's sendTyping method.
* Telegram typing indicators expire after ~5s, so we refresh every 4s.
* For adapters without sendTyping, this is a no-op.
*/
import type { ChannelAdapter } from "../types.js";
const TYPING_INTERVAL_MS = 4_000;
/**
* Start sending typing indicators. Returns a stop() handle.
* No-op if the adapter doesn't support sendTyping.
*/
export function startTyping(
adapter: ChannelAdapter | undefined,
recipient: string,
): { stop: () => void } {
if (!adapter?.sendTyping) return { stop() {} };
// Fire immediately
adapter.sendTyping(recipient).catch(() => {});
const timer = setInterval(() => {
adapter.sendTyping!(recipient).catch(() => {});
}, TYPING_INTERVAL_MS);
return {
stop() {
clearInterval(timer);
},
};
}

View file

@ -0,0 +1,94 @@
/**
* pi-channels Config from pi SettingsManager.
*
* Reads the "pi-channels" key from settings via SettingsManager,
* which merges global (~/.pi/agent/settings.json) and project
* (.pi/settings.json) configs automatically.
*
* Example settings.json:
* {
* "pi-channels": {
* "adapters": {
* "telegram": {
* "type": "telegram",
* "botToken": "your-telegram-bot-token"
* },
* "slack": {
* "type": "slack"
* }
* },
* "slack": {
* "appToken": "xapp-...",
* "botToken": "xoxb-..."
* },
* "routes": {
* "ops": { "adapter": "telegram", "recipient": "-100987654321" }
* }
* }
* }
*/
import { getAgentDir, SettingsManager } from "@mariozechner/pi-coding-agent";
import type { ChannelConfig } from "./types.js";
const SETTINGS_KEY = "pi-channels";
export function loadConfig(cwd: string): ChannelConfig {
const agentDir = getAgentDir();
const sm = SettingsManager.create(cwd, agentDir);
const global = sm.getGlobalSettings() as Record<string, any>;
const project = sm.getProjectSettings() as Record<string, any>;
const globalCh = global?.[SETTINGS_KEY] ?? {};
const projectCh = project?.[SETTINGS_KEY] ?? {};
// Project overrides global (shallow merge of adapters + routes + bridge)
const merged: ChannelConfig = {
adapters: {
...(globalCh.adapters ?? {}),
...(projectCh.adapters ?? {}),
} as ChannelConfig["adapters"],
routes: {
...(globalCh.routes ?? {}),
...(projectCh.routes ?? {}),
},
bridge: {
...(globalCh.bridge ?? {}),
...(projectCh.bridge ?? {}),
} as ChannelConfig["bridge"],
};
return merged;
}
/**
* Read a setting from the "pi-channels" config by dotted key path.
* Useful for adapter-specific secrets that shouldn't live in the adapter config block.
*
* Example: getChannelSetting(cwd, "slack.appToken") reads pi-channels.slack.appToken
*/
export function getChannelSetting(cwd: string, keyPath: string): unknown {
const agentDir = getAgentDir();
const sm = SettingsManager.create(cwd, agentDir);
const global = sm.getGlobalSettings() as Record<string, any>;
const project = sm.getProjectSettings() as Record<string, any>;
const globalCh = global?.[SETTINGS_KEY] ?? {};
const projectCh = project?.[SETTINGS_KEY] ?? {};
// Walk the dotted path independently in each scope to avoid
// shallow-merge dropping sibling keys from nested objects.
function walk(obj: any): unknown {
let current: any = obj;
for (const part of keyPath.split(".")) {
if (current == null || typeof current !== "object") return undefined;
current = current[part];
}
return current;
}
// Project overrides global at the leaf level.
// Use explicit undefined check so null can be used to unset a global default.
const projectValue = walk(projectCh);
return projectValue !== undefined ? projectValue : walk(globalCh);
}

View file

@ -0,0 +1,133 @@
/**
* pi-channels Event API registration.
*
* Events emitted:
* channel:receive incoming message from an external adapter
*
* Events listened to:
* cron:job_complete auto-routes cron output to channels
* channel:send send a message via an adapter
* channel:register register a custom adapter
* channel:remove remove an adapter
* channel:list list adapters + routes
* channel:test test an adapter with a ping
* bridge:* chat bridge lifecycle events
*/
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import type { ChatBridge } from "./bridge/bridge.js";
import type { ChannelRegistry } from "./registry.js";
import type {
ChannelAdapter,
ChannelMessage,
IncomingMessage,
} from "./types.js";
/** Reference to the active bridge, set by index.ts after construction. */
let activeBridge: ChatBridge | null = null;
export function setBridge(bridge: ChatBridge | null): void {
activeBridge = bridge;
}
export function registerChannelEvents(
pi: ExtensionAPI,
registry: ChannelRegistry,
): void {
// ── Incoming messages → channel:receive (+ bridge) ──────
registry.setOnIncoming((message: IncomingMessage) => {
pi.events.emit("channel:receive", message);
// Route to bridge if active
if (activeBridge?.isActive()) {
activeBridge.handleMessage(message);
}
});
// ── Auto-route cron job output ──────────────────────────
pi.events.on("cron:job_complete", (raw: unknown) => {
const event = raw as {
job: { name: string; channel: string; prompt: string };
response?: string;
ok: boolean;
error?: string;
durationMs: number;
};
if (!event.job.channel) return;
if (!event.response && !event.error) return;
const text = event.ok
? (event.response ?? "(no output)")
: `❌ Error: ${event.error ?? "unknown"}`;
registry.send({
adapter: event.job.channel,
recipient: "",
text,
source: `cron:${event.job.name}`,
metadata: { durationMs: event.durationMs, ok: event.ok },
});
});
// ── channel:send — deliver a message ─────────────────────
pi.events.on("channel:send", (raw: unknown) => {
const data = raw as ChannelMessage & {
callback?: (result: { ok: boolean; error?: string }) => void;
};
registry.send(data).then((r) => data.callback?.(r));
});
// ── channel:register — add a custom adapter ──────────────
pi.events.on("channel:register", (raw: unknown) => {
const data = raw as {
name: string;
adapter: ChannelAdapter;
callback?: (ok: boolean) => void;
};
if (!data.name || !data.adapter) {
data.callback?.(false);
return;
}
registry.register(data.name, data.adapter);
data.callback?.(true);
});
// ── channel:remove — remove an adapter ───────────────────
pi.events.on("channel:remove", (raw: unknown) => {
const data = raw as { name: string; callback?: (ok: boolean) => void };
data.callback?.(registry.unregister(data.name));
});
// ── channel:list — list adapters + routes ────────────────
pi.events.on("channel:list", (raw: unknown) => {
const data = raw as {
callback?: (items: ReturnType<ChannelRegistry["list"]>) => void;
};
data.callback?.(registry.list());
});
// ── channel:test — send a test ping ──────────────────────
pi.events.on("channel:test", (raw: unknown) => {
const data = raw as {
adapter: string;
recipient: string;
callback?: (result: { ok: boolean; error?: string }) => void;
};
registry
.send({
adapter: data.adapter,
recipient: data.recipient ?? "",
text: `🏓 pi-channels test — ${new Date().toISOString()}`,
source: "channel:test",
})
.then((r) => data.callback?.(r));
});
}

View file

@ -0,0 +1,168 @@
/**
* pi-channels Two-way channel extension for pi.
*
* Routes messages between agents and external services
* (Telegram, webhooks, custom adapters).
*
* Built-in adapters: telegram (bidirectional), webhook (outgoing)
* Custom adapters: register via pi.events.emit("channel:register", ...)
*
* Chat bridge: when enabled, incoming messages are routed to the agent
* as isolated subprocess prompts and responses are sent back. Enable via:
* - --chat-bridge flag
* - /chat-bridge on command
* - settings.json: { "pi-channels": { "bridge": { "enabled": true } } }
*
* Config in settings.json under "pi-channels":
* {
* "pi-channels": {
* "adapters": {
* "telegram": { "type": "telegram", "botToken": "your-telegram-bot-token", "polling": true }
* },
* "routes": {
* "ops": { "adapter": "telegram", "recipient": "-100987654321" }
* },
* "bridge": {
* "enabled": false,
* "maxQueuePerSender": 5,
* "timeoutMs": 300000,
* "maxConcurrent": 2,
* "typingIndicators": true,
* "commands": true
* }
* }
* }
*/
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { ChatBridge } from "./bridge/bridge.js";
import { loadConfig } from "./config.js";
import { registerChannelEvents, setBridge } from "./events.js";
import { createLogger } from "./logger.js";
import { ChannelRegistry } from "./registry.js";
import { registerChannelTool } from "./tool.js";
export default function (pi: ExtensionAPI) {
const log = createLogger(pi);
const registry = new ChannelRegistry();
registry.setLogger(log);
let bridge: ChatBridge | null = null;
// ── Flag: --chat-bridge ───────────────────────────────────
pi.registerFlag("chat-bridge", {
description:
"Enable the chat bridge on startup (incoming messages → agent → reply)",
type: "boolean",
default: false,
});
// ── Event API + cron integration ──────────────────────────
registerChannelEvents(pi, registry);
// ── Lifecycle ─────────────────────────────────────────────
pi.on("session_start", async (_event, ctx) => {
const config = loadConfig(ctx.cwd);
registry.loadConfig(config, ctx.cwd);
const errors = registry.getErrors();
for (const err of errors) {
ctx.ui.notify(`pi-channels: ${err.adapter}: ${err.error}`, "warning");
log("adapter-error", { adapter: err.adapter, error: err.error }, "ERROR");
}
log("init", {
adapters: Object.keys(config.adapters ?? {}),
routes: Object.keys(config.routes ?? {}),
});
// Start incoming/bidirectional adapters
await registry.startListening();
const startErrors = registry
.getErrors()
.filter((e) => e.error.startsWith("Failed to start"));
for (const err of startErrors) {
ctx.ui.notify(`pi-channels: ${err.adapter}: ${err.error}`, "warning");
}
// Initialize bridge
bridge = new ChatBridge(config.bridge, ctx.cwd, registry, pi.events, log);
setBridge(bridge);
const flagEnabled = pi.getFlag("--chat-bridge");
if (flagEnabled || config.bridge?.enabled) {
bridge.start();
log("bridge-start", {});
ctx.ui.notify("pi-channels: Chat bridge started", "info");
}
});
pi.on("session_shutdown", async () => {
if (bridge?.isActive()) log("bridge-stop", {});
bridge?.stop();
setBridge(null);
await registry.stopAll();
});
// ── Command: /chat-bridge ─────────────────────────────────
pi.registerCommand("chat-bridge", {
description: "Manage chat bridge: /chat-bridge [on|off|status]",
getArgumentCompletions: (prefix: string) => {
return ["on", "off", "status"]
.filter((c) => c.startsWith(prefix))
.map((c) => ({ value: c, label: c }));
},
handler: async (args, ctx) => {
const cmd = args?.trim().toLowerCase();
if (cmd === "on") {
if (!bridge) {
ctx.ui.notify(
"Chat bridge not initialized — no channel config?",
"warning",
);
return;
}
if (bridge.isActive()) {
ctx.ui.notify("Chat bridge is already running.", "info");
return;
}
bridge.start();
ctx.ui.notify("✓ Chat bridge started", "info");
return;
}
if (cmd === "off") {
if (!bridge?.isActive()) {
ctx.ui.notify("Chat bridge is not running.", "info");
return;
}
bridge.stop();
ctx.ui.notify("✓ Chat bridge stopped", "info");
return;
}
// Default: status
if (!bridge) {
ctx.ui.notify("Chat bridge: not initialized", "info");
return;
}
const stats = bridge.getStats();
const lines = [
`Chat bridge: ${stats.active ? "🟢 Active" : "⚪ Inactive"}`,
`Sessions: ${stats.sessions}`,
`Active prompts: ${stats.activePrompts}`,
`Queued: ${stats.totalQueued}`,
];
ctx.ui.notify(lines.join("\n"), "info");
},
});
// ── LLM tool ──────────────────────────────────────────────
registerChannelTool(pi, registry);
}

View file

@ -0,0 +1,8 @@
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
const CHANNEL = "channels";
export function createLogger(pi: ExtensionAPI) {
return (event: string, data: unknown, level = "INFO") =>
pi.events.emit("log", { channel: CHANNEL, event, level, data });
}

View file

@ -0,0 +1,234 @@
/**
* pi-channels Adapter registry + route resolution.
*/
import { createSlackAdapter } from "./adapters/slack.js";
import { createTelegramAdapter } from "./adapters/telegram.js";
import { createWebhookAdapter } from "./adapters/webhook.js";
import type {
AdapterConfig,
AdapterDirection,
ChannelAdapter,
ChannelConfig,
ChannelMessage,
IncomingMessage,
OnIncomingMessage,
} from "./types.js";
// ── Built-in adapter factories ──────────────────────────────────
export type AdapterLogger = (
event: string,
data: Record<string, unknown>,
level?: string,
) => void;
type AdapterFactory = (
config: AdapterConfig,
cwd?: string,
log?: AdapterLogger,
) => ChannelAdapter;
const builtinFactories: Record<string, AdapterFactory> = {
telegram: createTelegramAdapter,
webhook: createWebhookAdapter,
slack: createSlackAdapter,
};
// ── Registry ────────────────────────────────────────────────────
export class ChannelRegistry {
private adapters = new Map<string, ChannelAdapter>();
private routes = new Map<string, { adapter: string; recipient: string }>();
private errors: Array<{ adapter: string; error: string }> = [];
private onIncoming: OnIncomingMessage = () => {};
private log?: AdapterLogger;
/**
* Set the callback for incoming messages (called by the extension entry).
*/
setOnIncoming(cb: OnIncomingMessage): void {
this.onIncoming = cb;
}
/**
* Set the logger for adapter error reporting.
*/
setLogger(log: AdapterLogger): void {
this.log = log;
}
/**
* Load adapters + routes from config. Custom adapters (registered via events) are preserved.
* @param cwd working directory, passed to adapter factories for settings resolution.
*/
loadConfig(config: ChannelConfig, cwd?: string): void {
this.errors = [];
// Stop existing adapters
for (const adapter of this.adapters.values()) {
adapter.stop?.();
}
// Preserve custom adapters (prefixed with "custom:")
const custom = new Map<string, ChannelAdapter>();
for (const [name, adapter] of this.adapters) {
if (name.startsWith("custom:")) custom.set(name, adapter);
}
this.adapters = custom;
// Load routes
this.routes.clear();
if (config.routes) {
for (const [alias, target] of Object.entries(config.routes)) {
this.routes.set(alias, target);
}
}
// Create adapters from config
for (const [name, adapterConfig] of Object.entries(config.adapters)) {
const factory = builtinFactories[adapterConfig.type];
if (!factory) {
this.errors.push({
adapter: name,
error: `Unknown adapter type: ${adapterConfig.type}`,
});
continue;
}
try {
this.adapters.set(name, factory(adapterConfig, cwd, this.log));
} catch (err: any) {
this.errors.push({ adapter: name, error: err.message });
}
}
}
/** Start all incoming/bidirectional adapters. */
async startListening(): Promise<void> {
for (const [name, adapter] of this.adapters) {
if (
(adapter.direction === "incoming" ||
adapter.direction === "bidirectional") &&
adapter.start
) {
try {
await adapter.start((msg: IncomingMessage) => {
this.onIncoming({ ...msg, adapter: name });
});
} catch (err: any) {
this.errors.push({
adapter: name,
error: `Failed to start: ${err.message}`,
});
}
}
}
}
/** Stop all adapters. */
async stopAll(): Promise<void> {
for (const adapter of this.adapters.values()) {
await adapter.stop?.();
}
}
/** Register a custom adapter (from another extension). */
register(name: string, adapter: ChannelAdapter): void {
this.adapters.set(name, adapter);
// Auto-start if it receives
if (
(adapter.direction === "incoming" ||
adapter.direction === "bidirectional") &&
adapter.start
) {
adapter.start((msg: IncomingMessage) => {
this.onIncoming({ ...msg, adapter: name });
});
}
}
/** Unregister an adapter. */
unregister(name: string): boolean {
const adapter = this.adapters.get(name);
adapter?.stop?.();
return this.adapters.delete(name);
}
/**
* Send a message. Resolves routes, validates adapter supports sending.
*/
async send(
message: ChannelMessage,
): Promise<{ ok: boolean; error?: string }> {
let adapterName = message.adapter;
let recipient = message.recipient;
// Check if this is a route alias
const route = this.routes.get(adapterName);
if (route) {
adapterName = route.adapter;
if (!recipient) recipient = route.recipient;
}
const adapter = this.adapters.get(adapterName);
if (!adapter) {
return { ok: false, error: `No adapter "${adapterName}"` };
}
if (adapter.direction === "incoming") {
return {
ok: false,
error: `Adapter "${adapterName}" is incoming-only, cannot send`,
};
}
if (!adapter.send) {
return {
ok: false,
error: `Adapter "${adapterName}" has no send method`,
};
}
try {
await adapter.send({ ...message, adapter: adapterName, recipient });
return { ok: true };
} catch (err: any) {
return { ok: false, error: err.message };
}
}
/** List all registered adapters and route aliases. */
list(): Array<{
name: string;
type: "adapter" | "route";
direction?: AdapterDirection;
target?: string;
}> {
const result: Array<{
name: string;
type: "adapter" | "route";
direction?: AdapterDirection;
target?: string;
}> = [];
for (const [name, adapter] of this.adapters) {
result.push({ name, type: "adapter", direction: adapter.direction });
}
for (const [alias, target] of this.routes) {
result.push({
name: alias,
type: "route",
target: `${target.adapter}${target.recipient}`,
});
}
return result;
}
getErrors(): Array<{ adapter: string; error: string }> {
return [...this.errors];
}
/** Get an adapter by name (for direct access, e.g. typing indicators). */
getAdapter(name: string): ChannelAdapter | undefined {
return this.adapters.get(name);
}
}

View file

@ -0,0 +1,113 @@
/**
* pi-channels LLM tool registration.
*/
import { StringEnum } from "@mariozechner/pi-ai";
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { Type } from "@sinclair/typebox";
import type { ChannelRegistry } from "./registry.js";
interface ChannelToolParams {
action: "send" | "list" | "test";
adapter?: string;
recipient?: string;
text?: string;
source?: string;
}
export function registerChannelTool(
pi: ExtensionAPI,
registry: ChannelRegistry,
): void {
pi.registerTool({
name: "notify",
label: "Channel",
description:
"Send notifications via configured adapters (Telegram, webhooks, custom). " +
"Actions: send (deliver a message), list (show adapters + routes), test (send a ping).",
parameters: Type.Object({
action: StringEnum(["send", "list", "test"] as const, {
description: "Action to perform",
}) as any,
adapter: Type.Optional(
Type.String({
description: "Adapter name or route alias (required for send, test)",
}),
),
recipient: Type.Optional(
Type.String({
description:
"Recipient — chat ID, webhook URL, etc. (required for send unless using a route)",
}),
),
text: Type.Optional(
Type.String({ description: "Message text (required for send)" }),
),
source: Type.Optional(
Type.String({ description: "Source label (optional)" }),
),
}) as any,
async execute(_toolCallId, _params) {
const params = _params as ChannelToolParams;
let result: string;
switch (params.action) {
case "list": {
const items = registry.list();
if (items.length === 0) {
result =
'No adapters configured. Add "pi-channels" to your settings.json.';
} else {
const lines = items.map((i) =>
i.type === "route"
? `- **${i.name}** (route → ${i.target})`
: `- **${i.name}** (${i.direction ?? "adapter"})`,
);
result = `**Channel (${items.length}):**\n${lines.join("\n")}`;
}
break;
}
case "send": {
if (!params.adapter || !params.text) {
result = "Missing required fields: adapter and text.";
break;
}
const r = await registry.send({
adapter: params.adapter,
recipient: params.recipient ?? "",
text: params.text,
source: params.source,
});
result = r.ok
? `✓ Sent via "${params.adapter}"${params.recipient ? ` to ${params.recipient}` : ""}`
: `Failed: ${r.error}`;
break;
}
case "test": {
if (!params.adapter) {
result = "Missing required field: adapter.";
break;
}
const r = await registry.send({
adapter: params.adapter,
recipient: params.recipient ?? "",
text: `🏓 pi-channels test — ${new Date().toISOString()}`,
source: "channel:test",
});
result = r.ok
? `✓ Test sent via "${params.adapter}"${params.recipient ? ` to ${params.recipient}` : ""}`
: `Failed: ${r.error}`;
break;
}
default:
result = `Unknown action: ${(params as any).action}`;
}
return {
content: [{ type: "text" as const, text: result }],
details: {},
};
},
});
}

View file

@ -0,0 +1,197 @@
/**
* pi-channels Shared types.
*/
// ── Channel message ─────────────────────────────────────────────
export interface ChannelMessage {
/** Adapter name: "telegram", "webhook", or a custom adapter */
adapter: string;
/** Recipient — adapter-specific (chat ID, webhook URL, email address, etc.) */
recipient: string;
/** Message text to deliver */
text: string;
/** Where this came from (e.g. "cron:daily-standup") */
source?: string;
/** Arbitrary metadata for adapter handlers */
metadata?: Record<string, unknown>;
}
// ── Incoming message (from external → pi) ───────────────────────
export interface IncomingAttachment {
/** Attachment type */
type: "image" | "document" | "audio";
/** Local file path (temporary, downloaded by the adapter) */
path: string;
/** Original filename (if available) */
filename?: string;
/** MIME type */
mimeType?: string;
/** File size in bytes */
size?: number;
}
// ── Transcription config ────────────────────────────────────────
export interface TranscriptionConfig {
/** Enable voice/audio transcription (default: false) */
enabled: boolean;
/**
* Transcription provider:
* - "apple" macOS SFSpeechRecognizer (free, offline, no API key)
* - "openai" Whisper API
* - "elevenlabs" Scribe API
*/
provider: "apple" | "openai" | "elevenlabs";
/** API key for cloud providers (supports env:VAR_NAME). Not needed for apple. */
apiKey?: string;
/** Model name (e.g. "whisper-1", "scribe_v1"). Provider-specific default used if omitted. */
model?: string;
/** ISO 639-1 language hint (e.g. "en", "no"). Optional. */
language?: string;
}
export interface IncomingMessage {
/** Which adapter received this */
adapter: string;
/** Who sent it (chat ID, user ID, etc.) */
sender: string;
/** Message text */
text: string;
/** File attachments (images, documents) */
attachments?: IncomingAttachment[];
/** Adapter-specific metadata (message ID, username, timestamp, etc.) */
metadata?: Record<string, unknown>;
}
// ── Adapter direction ───────────────────────────────────────────
export type AdapterDirection = "outgoing" | "incoming" | "bidirectional";
/** Callback for adapters to emit incoming messages */
export type OnIncomingMessage = (message: IncomingMessage) => void;
// ── Adapter handler ─────────────────────────────────────────────
export interface ChannelAdapter {
/** What this adapter supports */
direction: AdapterDirection;
/** Send a message outward. Required for outgoing/bidirectional. */
send?(message: ChannelMessage): Promise<void>;
/** Start listening for incoming messages. Required for incoming/bidirectional. */
start?(onMessage: OnIncomingMessage): Promise<void>;
/** Stop listening. */
stop?(): Promise<void>;
/**
* Send a typing/processing indicator.
* Optional only supported by adapters that have real-time presence (e.g. Telegram).
*/
sendTyping?(recipient: string): Promise<void>;
}
// ── Config (lives under "pi-channels" key in pi settings.json) ──
export interface AdapterConfig {
type: string;
[key: string]: unknown;
}
export interface BridgeConfig {
/** Enable the chat bridge (default: false). Also enabled via --chat-bridge flag. */
enabled?: boolean;
/**
* Default session mode (default: "persistent").
*
* - "persistent" long-lived `pi --mode rpc` subprocess with conversation memory
* - "stateless" isolated `pi -p --no-session` subprocess per message (no memory)
*
* Can be overridden per sender via `sessionRules`.
*/
sessionMode?: "persistent" | "stateless";
/**
* Per-sender session mode overrides.
* Each rule matches sender keys (`adapter:senderId`) against glob patterns.
* First match wins. Unmatched senders use `sessionMode` default.
*
* Examples:
* - `{ "match": "telegram:-100*", "mode": "stateless" }` group chats stateless
* - `{ "match": "webhook:*", "mode": "stateless" }` all webhooks stateless
* - `{ "match": "telegram:123456789", "mode": "persistent" }` specific user persistent
*/
sessionRules?: Array<{ match: string; mode: "persistent" | "stateless" }>;
/**
* Idle timeout in minutes for persistent sessions (default: 30).
* After this period of inactivity, the sender's RPC subprocess is killed.
* A new one is spawned on the next message.
*/
idleTimeoutMinutes?: number;
/** Max queued messages per sender before rejecting (default: 5). */
maxQueuePerSender?: number;
/** Subprocess timeout in ms (default: 300000 = 5 min). */
timeoutMs?: number;
/** Max senders processed concurrently (default: 2). */
maxConcurrent?: number;
/** Model override for subprocess (default: null = use default). */
model?: string | null;
/** Send typing indicators while processing (default: true). */
typingIndicators?: boolean;
/** Handle bot commands like /start, /help, /abort (default: true). */
commands?: boolean;
/**
* Extension paths to load in bridge subprocesses.
* Subprocess runs with --no-extensions by default (avoids loading
* extensions that crash or conflict, e.g. webserver port collisions).
* List only the extensions the bridge agent actually needs.
*
* Example: ["/Users/you/Dev/pi/extensions/pi-vault/src/index.ts"]
*/
extensions?: string[];
}
export interface ChannelConfig {
/** Named adapter definitions */
adapters: Record<string, AdapterConfig>;
/**
* Route map: alias -> { adapter, recipient }.
* e.g. "ops" -> { adapter: "telegram", recipient: "-100987654321" }
* Lets cron jobs and other extensions use friendly names.
*/
routes?: Record<string, { adapter: string; recipient: string }>;
/** Chat bridge configuration. */
bridge?: BridgeConfig;
}
// ── Bridge types ────────────────────────────────────────────────
/** A queued prompt waiting to be processed. */
export interface QueuedPrompt {
id: string;
adapter: string;
sender: string;
text: string;
attachments?: IncomingAttachment[];
metadata?: Record<string, unknown>;
enqueuedAt: number;
}
/** Per-sender session state. */
export interface SenderSession {
adapter: string;
sender: string;
displayName: string;
queue: QueuedPrompt[];
processing: boolean;
abortController: AbortController | null;
messageCount: number;
startedAt: number;
}
/** Result from a subprocess run. */
export interface RunResult {
ok: boolean;
response: string;
error?: string;
durationMs: number;
exitCode: number;
}