This commit is contained in:
Harivansh Rathi 2026-03-05 15:55:27 -08:00
parent 863135d429
commit 43337449e3
88 changed files with 18387 additions and 11 deletions

View file

@ -0,0 +1,433 @@
/**
* pi-channels Chat bridge.
*
* Listens for incoming messages (channel:receive), serializes per sender,
* runs prompts via isolated subprocesses, and sends responses back via
* the same adapter. Each sender gets their own FIFO queue. Multiple
* senders run concurrently up to maxConcurrent.
*/
import type {
IncomingMessage,
IncomingAttachment,
QueuedPrompt,
SenderSession,
BridgeConfig,
} from "../types.ts";
import type { ChannelRegistry } from "../registry.ts";
import type { EventBus } from "@mariozechner/pi-coding-agent";
import { runPrompt } from "./runner.ts";
import { RpcSessionManager } from "./rpc-runner.ts";
import { isCommand, handleCommand, type CommandContext } from "./commands.ts";
import { startTyping } from "./typing.ts";
const BRIDGE_DEFAULTS: Required<BridgeConfig> = {
enabled: false,
sessionMode: "persistent",
sessionRules: [],
idleTimeoutMinutes: 30,
maxQueuePerSender: 5,
timeoutMs: 300_000,
maxConcurrent: 2,
model: null,
typingIndicators: true,
commands: true,
extensions: [],
};
type LogFn = (event: string, data: unknown, level?: string) => void;
let idCounter = 0;
function nextId(): string {
return `msg-${Date.now()}-${++idCounter}`;
}
export class ChatBridge {
private config: Required<BridgeConfig>;
private cwd: string;
private registry: ChannelRegistry;
private events: EventBus;
private log: LogFn;
private sessions = new Map<string, SenderSession>();
private activeCount = 0;
private running = false;
private rpcManager: RpcSessionManager | null = null;
constructor(
bridgeConfig: BridgeConfig | undefined,
cwd: string,
registry: ChannelRegistry,
events: EventBus,
log: LogFn = () => {},
) {
this.config = { ...BRIDGE_DEFAULTS, ...bridgeConfig };
this.cwd = cwd;
this.registry = registry;
this.events = events;
this.log = log;
}
// ── Lifecycle ─────────────────────────────────────────────
start(): void {
if (this.running) return;
this.running = true;
// Always create the RPC manager — it's used on-demand for persistent senders
this.rpcManager = new RpcSessionManager(
{
cwd: this.cwd,
model: this.config.model,
timeoutMs: this.config.timeoutMs,
extensions: this.config.extensions,
},
this.config.idleTimeoutMinutes * 60_000,
);
}
stop(): void {
this.running = false;
for (const session of this.sessions.values()) {
session.abortController?.abort();
}
this.sessions.clear();
this.activeCount = 0;
this.rpcManager?.killAll();
this.rpcManager = null;
}
isActive(): boolean {
return this.running;
}
updateConfig(cfg: BridgeConfig): void {
this.config = { ...BRIDGE_DEFAULTS, ...cfg };
}
// ── Main entry point ──────────────────────────────────────
handleMessage(message: IncomingMessage): void {
if (!this.running) return;
const text = message.text?.trim();
const hasAttachments = message.attachments && message.attachments.length > 0;
if (!text && !hasAttachments) return;
// Rejected messages (too large, unsupported type) — send back directly
if (message.metadata?.rejected) {
this.sendReply(message.adapter, message.sender, text || "⚠️ Unsupported message.");
return;
}
const senderKey = `${message.adapter}:${message.sender}`;
// Get or create session
let session = this.sessions.get(senderKey);
if (!session) {
session = this.createSession(message);
this.sessions.set(senderKey, session);
}
// Bot commands (only for text-only messages)
if (text && !hasAttachments && this.config.commands && isCommand(text)) {
const reply = handleCommand(text, session, this.commandContext());
if (reply !== null) {
this.sendReply(message.adapter, message.sender, reply);
return;
}
// Unrecognized command — fall through to agent
}
// Queue depth check
if (session.queue.length >= this.config.maxQueuePerSender) {
this.sendReply(
message.adapter,
message.sender,
`⚠️ Queue full (${this.config.maxQueuePerSender} pending). ` +
`Wait for current prompts to finish or use /abort.`,
);
return;
}
// Enqueue
const queued: QueuedPrompt = {
id: nextId(),
adapter: message.adapter,
sender: message.sender,
text: text || "Describe this.",
attachments: message.attachments,
metadata: message.metadata,
enqueuedAt: Date.now(),
};
session.queue.push(queued);
session.messageCount++;
this.events.emit("bridge:enqueue", {
id: queued.id, adapter: message.adapter, sender: message.sender,
queueDepth: session.queue.length,
});
this.processNext(senderKey);
}
// ── Processing ────────────────────────────────────────────
private async processNext(senderKey: string): Promise<void> {
const session = this.sessions.get(senderKey);
if (!session || session.processing || session.queue.length === 0) return;
if (this.activeCount >= this.config.maxConcurrent) return;
session.processing = true;
this.activeCount++;
const prompt = session.queue.shift()!;
// Typing indicator
const adapter = this.registry.getAdapter(prompt.adapter);
const typing = this.config.typingIndicators
? startTyping(adapter, prompt.sender)
: { stop() {} };
const ac = new AbortController();
session.abortController = ac;
const usePersistent = this.shouldUsePersistent(senderKey);
this.events.emit("bridge:start", {
id: prompt.id, adapter: prompt.adapter, sender: prompt.sender,
text: prompt.text.slice(0, 100),
persistent: usePersistent,
});
try {
let result;
if (usePersistent && this.rpcManager) {
// Persistent mode: use RPC session
result = await this.runWithRpc(senderKey, prompt, ac.signal);
} else {
// Stateless mode: spawn subprocess
result = await runPrompt({
prompt: prompt.text,
cwd: this.cwd,
timeoutMs: this.config.timeoutMs,
model: this.config.model,
signal: ac.signal,
attachments: prompt.attachments,
extensions: this.config.extensions,
});
}
typing.stop();
if (result.ok) {
this.sendReply(prompt.adapter, prompt.sender, result.response);
} else if (result.error === "Aborted by user") {
this.sendReply(prompt.adapter, prompt.sender, "⏹ Aborted.");
} else {
const userError = sanitizeError(result.error);
this.sendReply(
prompt.adapter, prompt.sender,
result.response || `${userError}`,
);
}
this.events.emit("bridge:complete", {
id: prompt.id, adapter: prompt.adapter, sender: prompt.sender,
ok: result.ok, durationMs: result.durationMs,
persistent: usePersistent,
});
this.log("bridge-complete", {
id: prompt.id, adapter: prompt.adapter, ok: result.ok,
durationMs: result.durationMs, persistent: usePersistent,
}, result.ok ? "INFO" : "WARN");
} catch (err: any) {
typing.stop();
this.log("bridge-error", { adapter: prompt.adapter, sender: prompt.sender, error: err.message }, "ERROR");
this.sendReply(prompt.adapter, prompt.sender, `❌ Unexpected error: ${err.message}`);
} finally {
session.abortController = null;
session.processing = false;
this.activeCount--;
if (session.queue.length > 0) this.processNext(senderKey);
this.drainWaiting();
}
}
/** Run a prompt via persistent RPC session. */
private async runWithRpc(
senderKey: string,
prompt: QueuedPrompt,
signal?: AbortSignal,
): Promise<import("../types.ts").RunResult> {
try {
const rpcSession = await this.rpcManager!.getSession(senderKey);
return await rpcSession.runPrompt(prompt.text, {
signal,
attachments: prompt.attachments,
});
} catch (err: any) {
return {
ok: false,
response: "",
error: err.message,
durationMs: 0,
exitCode: 1,
};
}
}
/** After a slot frees up, check other senders waiting for concurrency. */
private drainWaiting(): void {
if (this.activeCount >= this.config.maxConcurrent) return;
for (const [key, session] of this.sessions) {
if (!session.processing && session.queue.length > 0) {
this.processNext(key);
if (this.activeCount >= this.config.maxConcurrent) break;
}
}
}
// ── Session management ────────────────────────────────────
private createSession(message: IncomingMessage): SenderSession {
return {
adapter: message.adapter,
sender: message.sender,
displayName:
(message.metadata?.firstName as string) ||
(message.metadata?.username as string) ||
message.sender,
queue: [],
processing: false,
abortController: null,
messageCount: 0,
startedAt: Date.now(),
};
}
getStats(): {
active: boolean;
sessions: number;
activePrompts: number;
totalQueued: number;
} {
let totalQueued = 0;
for (const s of this.sessions.values()) totalQueued += s.queue.length;
return {
active: this.running,
sessions: this.sessions.size,
activePrompts: this.activeCount,
totalQueued,
};
}
getSessions(): Map<string, SenderSession> {
return this.sessions;
}
// ── Session mode resolution ───────────────────────────────
/**
* Determine if a sender should use persistent (RPC) or stateless mode.
* Checks sessionRules first (first match wins), falls back to sessionMode default.
*/
private shouldUsePersistent(senderKey: string): boolean {
for (const rule of this.config.sessionRules) {
if (globMatch(rule.match, senderKey)) {
return rule.mode === "persistent";
}
}
return this.config.sessionMode === "persistent";
}
// ── Command context ───────────────────────────────────────
private commandContext(): CommandContext {
return {
isPersistent: (sender: string) => {
// Find the sender key to check mode
for (const [key, session] of this.sessions) {
if (session.sender === sender) return this.shouldUsePersistent(key);
}
return this.config.sessionMode === "persistent";
},
abortCurrent: (sender: string): boolean => {
for (const session of this.sessions.values()) {
if (session.sender === sender && session.abortController) {
session.abortController.abort();
return true;
}
}
return false;
},
clearQueue: (sender: string): void => {
for (const session of this.sessions.values()) {
if (session.sender === sender) session.queue.length = 0;
}
},
resetSession: (sender: string): void => {
for (const [key, session] of this.sessions) {
if (session.sender === sender) {
this.sessions.delete(key);
// Also reset persistent RPC session
if (this.rpcManager) {
this.rpcManager.resetSession(key).catch(() => {});
}
}
}
},
};
}
// ── Reply ─────────────────────────────────────────────────
private sendReply(adapter: string, recipient: string, text: string): void {
this.registry.send({ adapter, recipient, text });
}
}
// ── Helpers ───────────────────────────────────────────────────
/**
* Simple glob matcher supporting `*` (any chars) and `?` (single char).
* Used for sessionRules pattern matching against "adapter:senderId" keys.
*/
function globMatch(pattern: string, text: string): boolean {
// Escape regex special chars except * and ?
const re = pattern
.replace(/[.+^${}()|[\]\\]/g, "\\$&")
.replace(/\*/g, ".*")
.replace(/\?/g, ".");
return new RegExp(`^${re}$`).test(text);
}
const MAX_ERROR_LENGTH = 200;
/**
* Sanitize subprocess error output for end-user display.
* Strips stack traces, extension crash logs, and long technical details.
*/
function sanitizeError(error: string | undefined): string {
if (!error) return "Something went wrong. Please try again.";
// Extract the most meaningful line — skip "Extension error" noise and stack traces
const lines = error.split("\n").filter(l => l.trim());
// Find the first line that isn't an extension loading error or stack frame
const meaningful = lines.find(l =>
!l.startsWith("Extension error") &&
!l.startsWith(" at ") &&
!l.startsWith("node:") &&
!l.includes("NODE_MODULE_VERSION") &&
!l.includes("compiled against a different") &&
!l.includes("Emitted 'error' event")
);
const msg = meaningful?.trim() || "Something went wrong. Please try again.";
return msg.length > MAX_ERROR_LENGTH
? msg.slice(0, MAX_ERROR_LENGTH) + "…"
: msg;
}

View file

@ -0,0 +1,131 @@
/**
* pi-channels Bot command handler.
*
* Detects messages starting with / and handles them without routing
* to the agent. Provides built-in commands and a registry for custom ones.
*
* Built-in: /start, /help, /abort, /status, /new
*/
import type { SenderSession } from "../types.ts";
export interface BotCommand {
name: string;
description: string;
handler: (args: string, session: SenderSession | undefined, ctx: CommandContext) => string | null;
}
export interface CommandContext {
abortCurrent: (sender: string) => boolean;
clearQueue: (sender: string) => void;
resetSession: (sender: string) => void;
/** Check if a given sender is using persistent (RPC) mode. */
isPersistent: (sender: string) => boolean;
}
const commands = new Map<string, BotCommand>();
export function isCommand(text: string): boolean {
return /^\/[a-zA-Z]/.test(text.trim());
}
export function parseCommand(text: string): { command: string; args: string } {
const match = text.trim().match(/^\/([a-zA-Z_]+)(?:@\S+)?\s*(.*)/s);
if (!match) return { command: "", args: "" };
return { command: match[1].toLowerCase(), args: match[2].trim() };
}
export function registerCommand(cmd: BotCommand): void {
commands.set(cmd.name.toLowerCase(), cmd);
}
export function unregisterCommand(name: string): void {
commands.delete(name.toLowerCase());
}
export function getAllCommands(): BotCommand[] {
return [...commands.values()].sort((a, b) => a.name.localeCompare(b.name));
}
/**
* Handle a command. Returns reply text, or null if unrecognized
* (fall through to agent).
*/
export function handleCommand(
text: string,
session: SenderSession | undefined,
ctx: CommandContext,
): string | null {
const { command } = parseCommand(text);
if (!command) return null;
const cmd = commands.get(command);
if (!cmd) return null;
const { args } = parseCommand(text);
return cmd.handler(args, session, ctx);
}
// ── Built-in commands ───────────────────────────────────────────
registerCommand({
name: "start",
description: "Welcome message",
handler: () =>
"👋 Hi! I'm your Pi assistant.\n\n" +
"Send me a message and I'll process it. Use /help to see available commands.",
});
registerCommand({
name: "help",
description: "Show available commands",
handler: () => {
const lines = getAllCommands().map((c) => `/${c.name}${c.description}`);
return `**Available commands:**\n\n${lines.join("\n")}`;
},
});
registerCommand({
name: "abort",
description: "Cancel the current prompt",
handler: (_args, session, ctx) => {
if (!session) return "No active session.";
if (!session.processing) return "Nothing is running right now.";
return ctx.abortCurrent(session.sender)
? "⏹ Aborting current prompt..."
: "Failed to abort — nothing running.";
},
});
registerCommand({
name: "status",
description: "Show session info",
handler: (_args, session, ctx) => {
if (!session) return "No active session. Send a message to start one.";
const persistent = ctx.isPersistent(session.sender);
const uptime = Math.floor((Date.now() - session.startedAt) / 1000);
const mins = Math.floor(uptime / 60);
const secs = uptime % 60;
return [
`**Session Status**`,
`- Mode: ${persistent ? "🔗 Persistent (conversation memory)" : "⚡ Stateless (no memory)"}`,
`- State: ${session.processing ? "⏳ Processing..." : "💤 Idle"}`,
`- Messages: ${session.messageCount}`,
`- Queue: ${session.queue.length} pending`,
`- Uptime: ${mins > 0 ? `${mins}m ${secs}s` : `${secs}s`}`,
].join("\n");
},
});
registerCommand({
name: "new",
description: "Clear queue and start fresh conversation",
handler: (_args, session, ctx) => {
if (!session) return "No active session.";
const persistent = ctx.isPersistent(session.sender);
ctx.abortCurrent(session.sender);
ctx.clearQueue(session.sender);
ctx.resetSession(session.sender);
return persistent
? "🔄 Session reset. Conversation context cleared. Queue cleared."
: "🔄 Session reset. Queue cleared.";
},
});

View file

@ -0,0 +1,435 @@
/**
* pi-channels Persistent RPC session runner.
*
* Maintains a long-lived `pi --mode rpc` subprocess per sender,
* enabling persistent conversation context across messages.
* Falls back to stateless runner if RPC fails to start.
*
* Lifecycle:
* 1. First message from a sender spawns a new RPC subprocess
* 2. Subsequent messages reuse the same subprocess (session persists)
* 3. /new command or idle timeout restarts the session
* 4. Subprocess crash triggers auto-restart on next message
*/
import { spawn, type ChildProcess } from "node:child_process";
import * as readline from "node:readline";
import type { RunResult, IncomingAttachment } from "../types.ts";
export interface RpcRunnerOptions {
cwd: string;
model?: string | null;
timeoutMs: number;
extensions?: string[];
}
interface PendingRequest {
resolve: (result: RunResult) => void;
startTime: number;
timer: ReturnType<typeof setTimeout>;
textChunks: string[];
abortHandler?: () => void;
}
/**
* A persistent RPC session for a single sender.
* Wraps a `pi --mode rpc` subprocess.
*/
export class RpcSession {
private child: ChildProcess | null = null;
private rl: readline.Interface | null = null;
private options: RpcRunnerOptions;
private pending: PendingRequest | null = null;
private ready = false;
private startedAt = 0;
private _onStreaming: ((text: string) => void) | null = null;
constructor(options: RpcRunnerOptions) {
this.options = options;
}
/** Spawn the RPC subprocess if not already running. */
async start(): Promise<boolean> {
if (this.child && this.ready) return true;
this.cleanup();
const args = ["--mode", "rpc", "--no-extensions"];
if (this.options.model) args.push("--model", this.options.model);
if (this.options.extensions?.length) {
for (const ext of this.options.extensions) {
args.push("-e", ext);
}
}
try {
this.child = spawn("pi", args, {
cwd: this.options.cwd,
stdio: ["pipe", "pipe", "pipe"],
env: { ...process.env },
});
} catch {
return false;
}
if (!this.child.stdout || !this.child.stdin) {
this.cleanup();
return false;
}
this.rl = readline.createInterface({ input: this.child.stdout });
this.rl.on("line", (line) => this.handleLine(line));
this.child.on("close", () => {
this.ready = false;
// Reject any pending request
if (this.pending) {
const p = this.pending;
this.pending = null;
clearTimeout(p.timer);
const text = p.textChunks.join("");
p.resolve({
ok: false,
response: text || "(session ended)",
error: "RPC subprocess exited unexpectedly",
durationMs: Date.now() - p.startTime,
exitCode: 1,
});
}
this.child = null;
this.rl = null;
});
this.child.on("error", () => {
this.cleanup();
});
this.ready = true;
this.startedAt = Date.now();
return true;
}
/** Send a prompt and collect the full response. */
runPrompt(
prompt: string,
options?: {
signal?: AbortSignal;
attachments?: IncomingAttachment[];
onStreaming?: (text: string) => void;
},
): Promise<RunResult> {
return new Promise(async (resolve) => {
// Ensure subprocess is running
if (!this.ready) {
const ok = await this.start();
if (!ok) {
resolve({
ok: false,
response: "",
error: "Failed to start RPC session",
durationMs: 0,
exitCode: 1,
});
return;
}
}
const startTime = Date.now();
this._onStreaming = options?.onStreaming ?? null;
// Timeout
const timer = setTimeout(() => {
if (this.pending) {
const p = this.pending;
this.pending = null;
const text = p.textChunks.join("");
p.resolve({
ok: false,
response: text || "(timed out)",
error: "Timeout",
durationMs: Date.now() - p.startTime,
exitCode: 124,
});
// Kill and restart on next message
this.cleanup();
}
}, this.options.timeoutMs);
this.pending = { resolve, startTime, timer, textChunks: [] };
// Abort handler
const onAbort = () => {
this.sendCommand({ type: "abort" });
};
if (options?.signal) {
if (options.signal.aborted) {
clearTimeout(timer);
this.pending = null;
this.sendCommand({ type: "abort" });
resolve({
ok: false,
response: "(aborted)",
error: "Aborted by user",
durationMs: Date.now() - startTime,
exitCode: 130,
});
return;
}
options.signal.addEventListener("abort", onAbort, { once: true });
this.pending.abortHandler = () =>
options.signal?.removeEventListener("abort", onAbort);
}
// Build prompt command
const cmd: Record<string, unknown> = {
type: "prompt",
message: prompt,
};
// Attach images as base64
if (options?.attachments?.length) {
const images: Array<Record<string, string>> = [];
for (const att of options.attachments) {
if (att.type === "image") {
try {
const fs = await import("node:fs");
const data = fs.readFileSync(att.path).toString("base64");
images.push({
type: "image",
data,
mimeType: att.mimeType || "image/jpeg",
});
} catch {
// Skip unreadable attachments
}
}
}
if (images.length > 0) cmd.images = images;
}
this.sendCommand(cmd);
});
}
/** Request a new session (clear context). */
async newSession(): Promise<void> {
if (this.ready) {
this.sendCommand({ type: "new_session" });
}
}
/** Check if the subprocess is alive. */
isAlive(): boolean {
return this.ready && this.child !== null;
}
/** Get uptime in ms. */
uptime(): number {
return this.ready ? Date.now() - this.startedAt : 0;
}
/** Kill the subprocess. */
cleanup(): void {
this.ready = false;
this._onStreaming = null;
if (this.pending) {
clearTimeout(this.pending.timer);
this.pending.abortHandler?.();
this.pending = null;
}
if (this.rl) {
this.rl.close();
this.rl = null;
}
if (this.child) {
this.child.kill("SIGTERM");
setTimeout(() => {
if (this.child && !this.child.killed) this.child.kill("SIGKILL");
}, 3000);
this.child = null;
}
}
// ── Private ─────────────────────────────────────────────
private sendCommand(cmd: Record<string, unknown>): void {
if (!this.child?.stdin?.writable) return;
this.child.stdin.write(JSON.stringify(cmd) + "\n");
}
private handleLine(line: string): void {
let event: Record<string, unknown>;
try {
event = JSON.parse(line);
} catch {
return;
}
const type = event.type as string;
// Streaming text deltas
if (type === "message_update") {
const delta = event.assistantMessageEvent as Record<string, unknown> | undefined;
if (delta?.type === "text_delta" && typeof delta.delta === "string") {
if (this.pending) this.pending.textChunks.push(delta.delta);
if (this._onStreaming) this._onStreaming(delta.delta);
}
}
// Agent finished — resolve the pending promise
if (type === "agent_end") {
if (this.pending) {
const p = this.pending;
this.pending = null;
this._onStreaming = null;
clearTimeout(p.timer);
p.abortHandler?.();
const text = p.textChunks.join("").trim();
p.resolve({
ok: true,
response: text || "(no output)",
durationMs: Date.now() - p.startTime,
exitCode: 0,
});
}
}
// Handle errors in message_update (aborted, error)
if (type === "message_update") {
const delta = event.assistantMessageEvent as Record<string, unknown> | undefined;
if (delta?.type === "done" && delta.reason === "error") {
if (this.pending) {
const p = this.pending;
this.pending = null;
this._onStreaming = null;
clearTimeout(p.timer);
p.abortHandler?.();
const text = p.textChunks.join("").trim();
p.resolve({
ok: false,
response: text || "",
error: "Agent error",
durationMs: Date.now() - p.startTime,
exitCode: 1,
});
}
}
}
// Prompt response (just ack, actual result comes via agent_end)
// Response errors
if (type === "response") {
const success = event.success as boolean;
if (!success && this.pending) {
const p = this.pending;
this.pending = null;
this._onStreaming = null;
clearTimeout(p.timer);
p.abortHandler?.();
p.resolve({
ok: false,
response: "",
error: (event.error as string) || "RPC command failed",
durationMs: Date.now() - p.startTime,
exitCode: 1,
});
}
}
}
}
/**
* Manages RPC sessions across multiple senders.
* Each sender gets their own persistent subprocess.
*/
export class RpcSessionManager {
private sessions = new Map<string, RpcSession>();
private options: RpcRunnerOptions;
private idleTimeoutMs: number;
private idleTimers = new Map<string, ReturnType<typeof setTimeout>>();
constructor(
options: RpcRunnerOptions,
idleTimeoutMs = 30 * 60_000, // 30 min default
) {
this.options = options;
this.idleTimeoutMs = idleTimeoutMs;
}
/** Get or create a session for a sender. */
async getSession(senderKey: string): Promise<RpcSession> {
let session = this.sessions.get(senderKey);
if (session && session.isAlive()) {
this.resetIdleTimer(senderKey);
return session;
}
// Clean up dead session
if (session) {
session.cleanup();
this.sessions.delete(senderKey);
}
// Create new
session = new RpcSession(this.options);
const ok = await session.start();
if (!ok) throw new Error("Failed to start RPC session");
this.sessions.set(senderKey, session);
this.resetIdleTimer(senderKey);
return session;
}
/** Reset a sender's session (new conversation). */
async resetSession(senderKey: string): Promise<void> {
const session = this.sessions.get(senderKey);
if (session) {
await session.newSession();
}
}
/** Kill a specific sender's session. */
killSession(senderKey: string): void {
const session = this.sessions.get(senderKey);
if (session) {
session.cleanup();
this.sessions.delete(senderKey);
}
const timer = this.idleTimers.get(senderKey);
if (timer) {
clearTimeout(timer);
this.idleTimers.delete(senderKey);
}
}
/** Kill all sessions. */
killAll(): void {
for (const [key, session] of this.sessions) {
session.cleanup();
}
this.sessions.clear();
for (const timer of this.idleTimers.values()) {
clearTimeout(timer);
}
this.idleTimers.clear();
}
/** Get stats. */
getStats(): { activeSessions: number; senders: string[] } {
return {
activeSessions: this.sessions.size,
senders: [...this.sessions.keys()],
};
}
private resetIdleTimer(senderKey: string): void {
const existing = this.idleTimers.get(senderKey);
if (existing) clearTimeout(existing);
const timer = setTimeout(() => {
this.killSession(senderKey);
}, this.idleTimeoutMs);
this.idleTimers.set(senderKey, timer);
}
}

View file

@ -0,0 +1,100 @@
/**
* pi-channels Subprocess runner for the chat bridge.
*
* Spawns `pi -p --no-session [@files...] <prompt>` to process a single prompt.
* Supports file attachments (images, documents) via the @file syntax.
* Same pattern as pi-cron and pi-heartbeat.
*/
import { spawn, type ChildProcess } from "node:child_process";
import type { RunResult, IncomingAttachment } from "../types.ts";
export interface RunOptions {
prompt: string;
cwd: string;
timeoutMs: number;
model?: string | null;
signal?: AbortSignal;
/** File attachments to include via @file args. */
attachments?: IncomingAttachment[];
/** Explicit extension paths to load (with --no-extensions + -e for each). */
extensions?: string[];
}
export function runPrompt(options: RunOptions): Promise<RunResult> {
const { prompt, cwd, timeoutMs, model, signal, attachments, extensions } = options;
return new Promise((resolve) => {
const startTime = Date.now();
const args = ["-p", "--no-session", "--no-extensions"];
if (model) args.push("--model", model);
// Explicitly load only bridge-safe extensions
if (extensions?.length) {
for (const ext of extensions) {
args.push("-e", ext);
}
}
// Add file attachments as @file args before the prompt
if (attachments?.length) {
for (const att of attachments) {
args.push(`@${att.path}`);
}
}
args.push(prompt);
let child: ChildProcess;
try {
child = spawn("pi", args, {
cwd,
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env },
timeout: timeoutMs,
});
} catch (err: any) {
resolve({
ok: false, response: "", error: `Failed to spawn: ${err.message}`,
durationMs: Date.now() - startTime, exitCode: 1,
});
return;
}
let stdout = "";
let stderr = "";
child.stdout?.on("data", (chunk: Buffer) => { stdout += chunk.toString(); });
child.stderr?.on("data", (chunk: Buffer) => { stderr += chunk.toString(); });
const onAbort = () => {
child.kill("SIGTERM");
setTimeout(() => { if (!child.killed) child.kill("SIGKILL"); }, 3000);
};
if (signal) {
if (signal.aborted) { onAbort(); }
else { signal.addEventListener("abort", onAbort, { once: true }); }
}
child.on("close", (code) => {
signal?.removeEventListener("abort", onAbort);
const durationMs = Date.now() - startTime;
const response = stdout.trim();
const exitCode = code ?? 1;
if (signal?.aborted) {
resolve({ ok: false, response: response || "(aborted)", error: "Aborted by user", durationMs, exitCode: 130 });
} else if (exitCode !== 0) {
resolve({ ok: false, response, error: stderr.trim() || `Exit code ${exitCode}`, durationMs, exitCode });
} else {
resolve({ ok: true, response: response || "(no output)", durationMs, exitCode: 0 });
}
});
child.on("error", (err) => {
signal?.removeEventListener("abort", onAbort);
resolve({ ok: false, response: "", error: err.message, durationMs: Date.now() - startTime, exitCode: 1 });
});
});
}

View file

@ -0,0 +1,35 @@
/**
* pi-channels Typing indicator manager.
*
* Sends periodic typing chat actions via the adapter's sendTyping method.
* Telegram typing indicators expire after ~5s, so we refresh every 4s.
* For adapters without sendTyping, this is a no-op.
*/
import type { ChannelAdapter } from "../types.ts";
const TYPING_INTERVAL_MS = 4_000;
/**
* Start sending typing indicators. Returns a stop() handle.
* No-op if the adapter doesn't support sendTyping.
*/
export function startTyping(
adapter: ChannelAdapter | undefined,
recipient: string,
): { stop: () => void } {
if (!adapter?.sendTyping) return { stop() {} };
// Fire immediately
adapter.sendTyping(recipient).catch(() => {});
const timer = setInterval(() => {
adapter.sendTyping!(recipient).catch(() => {});
}, TYPING_INTERVAL_MS);
return {
stop() {
clearInterval(timer);
},
};
}