new gateway

This commit is contained in:
Harivansh Rathi 2026-03-05 18:58:27 -08:00
parent 01958298e0
commit 9a0b848789
34 changed files with 1632 additions and 290 deletions

View file

@ -187,7 +187,8 @@ ${chalk.bold("Commands:")}
${APP_NAME} remove <source> [-l] Remove extension source from settings
${APP_NAME} update [source] Update installed extensions (skips pinned sources)
${APP_NAME} list List installed extensions from settings
${APP_NAME} daemon Run in long-lived daemon mode (extensions stay active)
${APP_NAME} gateway Run the always-on gateway process
${APP_NAME} daemon Alias for gateway
${APP_NAME} config Open TUI to enable/disable package resources
${APP_NAME} <command> --help Show help for install/remove/update/list

View file

@ -0,0 +1,652 @@
import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
import { join } from "node:path";
import { URL } from "node:url";
import type { ImageContent } from "@mariozechner/pi-ai";
import type { AgentSession, AgentSessionEvent } from "./agent-session.js";
import { SessionManager } from "./session-manager.js";
export interface GatewayConfig {
bind: string;
port: number;
bearerToken?: string;
session: {
idleMinutes: number;
maxQueuePerSession: number;
};
webhook: {
enabled: boolean;
basePath: string;
secret?: string;
};
}
export type GatewaySessionFactory = (sessionKey: string) => Promise<AgentSession>;
export interface GatewayMessageRequest {
sessionKey: string;
text: string;
source?: "interactive" | "rpc" | "extension";
images?: ImageContent[];
metadata?: Record<string, unknown>;
}
export interface GatewayMessageResult {
ok: boolean;
response: string;
error?: string;
sessionKey: string;
}
export interface GatewaySessionSnapshot {
sessionKey: string;
sessionId: string;
messageCount: number;
queueDepth: number;
processing: boolean;
lastActiveAt: number;
createdAt: number;
}
export interface GatewayRuntimeOptions {
config: GatewayConfig;
primarySessionKey: string;
primarySession: AgentSession;
createSession: GatewaySessionFactory;
log?: (message: string) => void;
}
interface GatewayQueuedMessage {
request: GatewayMessageRequest;
resolve: (result: GatewayMessageResult) => void;
}
type GatewayEvent =
| { type: "hello"; sessionKey: string; snapshot: GatewaySessionSnapshot }
| { type: "session_state"; sessionKey: string; snapshot: GatewaySessionSnapshot }
| { type: "turn_start"; sessionKey: string }
| { type: "turn_end"; sessionKey: string }
| { type: "message_start"; sessionKey: string; role?: string }
| { type: "token"; sessionKey: string; delta: string; contentIndex: number }
| { type: "thinking"; sessionKey: string; delta: string; contentIndex: number }
| { type: "tool_start"; sessionKey: string; toolCallId: string; toolName: string; args: unknown }
| { type: "tool_update"; sessionKey: string; toolCallId: string; toolName: string; partialResult: unknown }
| {
type: "tool_complete";
sessionKey: string;
toolCallId: string;
toolName: string;
result: unknown;
isError: boolean;
}
| { type: "message_complete"; sessionKey: string; text: string }
| { type: "error"; sessionKey: string; error: string }
| { type: "aborted"; sessionKey: string };
interface ManagedGatewaySession {
sessionKey: string;
session: AgentSession;
queue: GatewayQueuedMessage[];
processing: boolean;
createdAt: number;
lastActiveAt: number;
listeners: Set<(event: GatewayEvent) => void>;
unsubscribe: () => void;
}
let activeGatewayRuntime: GatewayRuntime | null = null;
export function setActiveGatewayRuntime(runtime: GatewayRuntime | null): void {
activeGatewayRuntime = runtime;
}
export function getActiveGatewayRuntime(): GatewayRuntime | null {
return activeGatewayRuntime;
}
export class GatewayRuntime {
private readonly config: GatewayConfig;
private readonly primarySessionKey: string;
private readonly primarySession: AgentSession;
private readonly createSession: GatewaySessionFactory;
private readonly log: (message: string) => void;
private readonly sessions = new Map<string, ManagedGatewaySession>();
private readonly sessionDirRoot: string;
private server: Server | null = null;
private idleSweepTimer: NodeJS.Timeout | null = null;
private ready = false;
constructor(options: GatewayRuntimeOptions) {
this.config = options.config;
this.primarySessionKey = options.primarySessionKey;
this.primarySession = options.primarySession;
this.createSession = options.createSession;
this.log = options.log ?? (() => {});
this.sessionDirRoot = join(options.primarySession.sessionManager.getSessionDir(), "..", "gateway-sessions");
}
async start(): Promise<void> {
if (this.server) return;
await this.ensureSession(this.primarySessionKey, this.primarySession);
this.server = createServer((request, response) => {
void this.handleHttpRequest(request, response).catch((error) => {
const message = error instanceof Error ? error.message : String(error);
this.writeJson(response, 500, { error: message });
});
});
await new Promise<void>((resolve, reject) => {
this.server?.once("error", reject);
this.server?.listen(this.config.port, this.config.bind, () => {
this.server?.off("error", reject);
resolve();
});
});
this.idleSweepTimer = setInterval(() => {
void this.evictIdleSessions();
}, 60_000);
this.ready = true;
}
async stop(): Promise<void> {
this.ready = false;
if (this.idleSweepTimer) {
clearInterval(this.idleSweepTimer);
this.idleSweepTimer = null;
}
if (this.server) {
await new Promise<void>((resolve, reject) => {
this.server?.close((error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
this.server = null;
}
for (const [sessionKey, managedSession] of this.sessions) {
managedSession.unsubscribe();
if (sessionKey !== this.primarySessionKey) {
managedSession.session.dispose();
}
}
this.sessions.clear();
}
isReady(): boolean {
return this.ready;
}
getAddress(): { bind: string; port: number } {
return { bind: this.config.bind, port: this.config.port };
}
async enqueueMessage(request: GatewayMessageRequest): Promise<GatewayMessageResult> {
const managedSession = await this.ensureSession(request.sessionKey);
if (managedSession.queue.length >= this.config.session.maxQueuePerSession) {
return {
ok: false,
response: "",
error: `Queue full (${this.config.session.maxQueuePerSession} pending).`,
sessionKey: request.sessionKey,
};
}
return new Promise<GatewayMessageResult>((resolve) => {
managedSession.queue.push({ request, resolve });
this.emitState(managedSession);
void this.processNext(managedSession);
});
}
async addSubscriber(sessionKey: string, listener: (event: GatewayEvent) => void): Promise<() => void> {
const managedSession = await this.ensureSession(sessionKey);
managedSession.listeners.add(listener);
listener({ type: "hello", sessionKey, snapshot: this.createSnapshot(managedSession) });
return () => {
managedSession.listeners.delete(listener);
};
}
abortSession(sessionKey: string): boolean {
const managedSession = this.sessions.get(sessionKey);
if (!managedSession?.processing) {
return false;
}
void managedSession.session.abort().catch((error) => {
this.emit(managedSession, {
type: "error",
sessionKey,
error: error instanceof Error ? error.message : String(error),
});
});
return true;
}
clearQueue(sessionKey: string): void {
const managedSession = this.sessions.get(sessionKey);
if (!managedSession) return;
managedSession.queue.length = 0;
this.emitState(managedSession);
}
async resetSession(sessionKey: string): Promise<void> {
const managedSession = this.sessions.get(sessionKey);
if (!managedSession) return;
if (sessionKey === this.primarySessionKey) {
await managedSession.session.newSession();
managedSession.queue.length = 0;
managedSession.processing = false;
managedSession.lastActiveAt = Date.now();
this.emitState(managedSession);
return;
}
if (managedSession.processing) {
await managedSession.session.abort();
}
managedSession.unsubscribe();
managedSession.session.dispose();
this.sessions.delete(sessionKey);
}
listSessions(): GatewaySessionSnapshot[] {
return Array.from(this.sessions.values()).map((session) => this.createSnapshot(session));
}
getSession(sessionKey: string): GatewaySessionSnapshot | undefined {
const session = this.sessions.get(sessionKey);
return session ? this.createSnapshot(session) : undefined;
}
private async ensureSession(sessionKey: string, existingSession?: AgentSession): Promise<ManagedGatewaySession> {
const found = this.sessions.get(sessionKey);
if (found) {
found.lastActiveAt = Date.now();
return found;
}
const session = existingSession ?? (await this.createSession(sessionKey));
const managedSession: ManagedGatewaySession = {
sessionKey,
session,
queue: [],
processing: false,
createdAt: Date.now(),
lastActiveAt: Date.now(),
listeners: new Set(),
unsubscribe: () => {},
};
managedSession.unsubscribe = session.subscribe((event) => {
this.handleSessionEvent(managedSession, event);
});
this.sessions.set(sessionKey, managedSession);
this.emitState(managedSession);
return managedSession;
}
private async processNext(managedSession: ManagedGatewaySession): Promise<void> {
if (managedSession.processing || managedSession.queue.length === 0) {
return;
}
const queued = managedSession.queue.shift();
if (!queued) return;
managedSession.processing = true;
managedSession.lastActiveAt = Date.now();
this.emitState(managedSession);
try {
await managedSession.session.prompt(queued.request.text, {
images: queued.request.images,
source: queued.request.source ?? "extension",
});
const response = getLastAssistantText(managedSession.session);
queued.resolve({
ok: true,
response,
sessionKey: managedSession.sessionKey,
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (message.includes("aborted")) {
this.emit(managedSession, { type: "aborted", sessionKey: managedSession.sessionKey });
} else {
this.emit(managedSession, { type: "error", sessionKey: managedSession.sessionKey, error: message });
}
queued.resolve({
ok: false,
response: "",
error: message,
sessionKey: managedSession.sessionKey,
});
} finally {
managedSession.processing = false;
managedSession.lastActiveAt = Date.now();
this.emitState(managedSession);
if (managedSession.queue.length > 0) {
void this.processNext(managedSession);
}
}
}
private handleSessionEvent(managedSession: ManagedGatewaySession, event: AgentSessionEvent): void {
switch (event.type) {
case "turn_start":
this.emit(managedSession, { type: "turn_start", sessionKey: managedSession.sessionKey });
return;
case "turn_end":
this.emit(managedSession, { type: "turn_end", sessionKey: managedSession.sessionKey });
return;
case "message_start":
this.emit(managedSession, {
type: "message_start",
sessionKey: managedSession.sessionKey,
role: event.message.role,
});
return;
case "message_update":
switch (event.assistantMessageEvent.type) {
case "text_delta":
this.emit(managedSession, {
type: "token",
sessionKey: managedSession.sessionKey,
delta: event.assistantMessageEvent.delta,
contentIndex: event.assistantMessageEvent.contentIndex,
});
return;
case "thinking_delta":
this.emit(managedSession, {
type: "thinking",
sessionKey: managedSession.sessionKey,
delta: event.assistantMessageEvent.delta,
contentIndex: event.assistantMessageEvent.contentIndex,
});
return;
}
return;
case "message_end":
if (event.message.role === "assistant") {
this.emit(managedSession, {
type: "message_complete",
sessionKey: managedSession.sessionKey,
text: extractMessageText(event.message),
});
}
return;
case "tool_execution_start":
this.emit(managedSession, {
type: "tool_start",
sessionKey: managedSession.sessionKey,
toolCallId: event.toolCallId,
toolName: event.toolName,
args: event.args,
});
return;
case "tool_execution_update":
this.emit(managedSession, {
type: "tool_update",
sessionKey: managedSession.sessionKey,
toolCallId: event.toolCallId,
toolName: event.toolName,
partialResult: event.partialResult,
});
return;
case "tool_execution_end":
this.emit(managedSession, {
type: "tool_complete",
sessionKey: managedSession.sessionKey,
toolCallId: event.toolCallId,
toolName: event.toolName,
result: event.result,
isError: event.isError,
});
return;
}
}
private emit(managedSession: ManagedGatewaySession, event: GatewayEvent): void {
for (const listener of managedSession.listeners) {
listener(event);
}
}
private emitState(managedSession: ManagedGatewaySession): void {
this.emit(managedSession, {
type: "session_state",
sessionKey: managedSession.sessionKey,
snapshot: this.createSnapshot(managedSession),
});
}
private createSnapshot(managedSession: ManagedGatewaySession): GatewaySessionSnapshot {
return {
sessionKey: managedSession.sessionKey,
sessionId: managedSession.session.sessionId,
messageCount: managedSession.session.messages.length,
queueDepth: managedSession.queue.length,
processing: managedSession.processing,
lastActiveAt: managedSession.lastActiveAt,
createdAt: managedSession.createdAt,
};
}
private async evictIdleSessions(): Promise<void> {
const cutoff = Date.now() - this.config.session.idleMinutes * 60_000;
for (const [sessionKey, managedSession] of this.sessions) {
if (sessionKey === this.primarySessionKey) {
continue;
}
if (managedSession.processing || managedSession.queue.length > 0) {
continue;
}
if (managedSession.lastActiveAt > cutoff) {
continue;
}
if (managedSession.listeners.size > 0) {
continue;
}
managedSession.unsubscribe();
managedSession.session.dispose();
this.sessions.delete(sessionKey);
this.log(`evicted idle session ${sessionKey}`);
}
}
private async handleHttpRequest(request: IncomingMessage, response: ServerResponse): Promise<void> {
const method = request.method ?? "GET";
const url = new URL(
request.url ?? "/",
`http://${request.headers.host ?? `${this.config.bind}:${this.config.port}`}`,
);
const path = url.pathname;
if (method === "GET" && path === "/health") {
this.writeJson(response, 200, { ok: true, ready: this.ready });
return;
}
if (method === "GET" && path === "/ready") {
this.requireAuth(request, response);
if (response.writableEnded) return;
this.writeJson(response, 200, { ok: true, ready: this.ready, sessions: this.sessions.size });
return;
}
if (this.config.webhook.enabled && method === "POST" && path.startsWith(this.config.webhook.basePath)) {
await this.handleWebhookRequest(path, request, response);
return;
}
this.requireAuth(request, response);
if (response.writableEnded) return;
if (method === "GET" && path === "/sessions") {
this.writeJson(response, 200, { sessions: this.listSessions() });
return;
}
const sessionMatch = path.match(/^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset))?$/);
if (!sessionMatch) {
this.writeJson(response, 404, { error: "Not found" });
return;
}
const sessionKey = decodeURIComponent(sessionMatch[1]);
const action = sessionMatch[2];
if (!action && method === "GET") {
const session = await this.ensureSession(sessionKey);
this.writeJson(response, 200, { session: this.createSnapshot(session) });
return;
}
if (action === "events" && method === "GET") {
await this.handleSse(sessionKey, request, response);
return;
}
if (action === "messages" && method === "POST") {
const body = await this.readJsonBody(request);
const text = typeof body.text === "string" ? body.text : "";
if (!text.trim()) {
this.writeJson(response, 400, { error: "Missing text" });
return;
}
const result = await this.enqueueMessage({
sessionKey,
text,
source: "extension",
});
this.writeJson(response, result.ok ? 200 : 500, result);
return;
}
if (action === "abort" && method === "POST") {
this.writeJson(response, 200, { ok: this.abortSession(sessionKey) });
return;
}
if (action === "reset" && method === "POST") {
await this.resetSession(sessionKey);
this.writeJson(response, 200, { ok: true });
return;
}
this.writeJson(response, 405, { error: "Method not allowed" });
}
private async handleWebhookRequest(path: string, request: IncomingMessage, response: ServerResponse): Promise<void> {
const route = path.slice(this.config.webhook.basePath.length).replace(/^\/+/, "") || "default";
if (this.config.webhook.secret) {
const presentedSecret = request.headers["x-pi-webhook-secret"];
if (presentedSecret !== this.config.webhook.secret) {
this.writeJson(response, 401, { error: "Invalid webhook secret" });
return;
}
}
const body = await this.readJsonBody(request);
const text = typeof body.text === "string" ? body.text : "";
if (!text.trim()) {
this.writeJson(response, 400, { error: "Missing text" });
return;
}
const conversationId =
typeof body.sessionKey === "string"
? body.sessionKey
: `webhook:${route}:${typeof body.sender === "string" ? body.sender : "default"}`;
const result = await this.enqueueMessage({
sessionKey: conversationId,
text,
source: "extension",
metadata: typeof body.metadata === "object" && body.metadata ? (body.metadata as Record<string, unknown>) : {},
});
this.writeJson(response, result.ok ? 200 : 500, result);
}
private async handleSse(sessionKey: string, request: IncomingMessage, response: ServerResponse): Promise<void> {
response.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
});
response.write("\n");
const unsubscribe = await this.addSubscriber(sessionKey, (event) => {
response.write(`data: ${JSON.stringify(event)}\n\n`);
});
request.on("close", () => {
unsubscribe();
});
}
private requireAuth(request: IncomingMessage, response: ServerResponse): void {
if (!this.config.bearerToken) {
return;
}
const header = request.headers.authorization;
if (header === `Bearer ${this.config.bearerToken}`) {
return;
}
this.writeJson(response, 401, { error: "Unauthorized" });
}
private async readJsonBody(request: IncomingMessage): Promise<Record<string, unknown>> {
const chunks: Buffer[] = [];
for await (const chunk of request) {
chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk);
}
if (chunks.length === 0) {
return {};
}
const body = Buffer.concat(chunks).toString("utf8");
return JSON.parse(body) as Record<string, unknown>;
}
private writeJson(response: ServerResponse, statusCode: number, payload: unknown): void {
response.statusCode = statusCode;
response.setHeader("content-type", "application/json; charset=utf-8");
response.end(JSON.stringify(payload));
}
getGatewaySessionDir(sessionKey: string): string {
return join(this.sessionDirRoot, sanitizeSessionKey(sessionKey));
}
}
function extractMessageText(message: { content: unknown }): string {
if (!Array.isArray(message.content)) {
return "";
}
return message.content
.filter((part): part is { type: "text"; text: string } => {
return typeof part === "object" && part !== null && "type" in part && "text" in part && part.type === "text";
})
.map((part) => part.text)
.join("");
}
function getLastAssistantText(session: AgentSession): string {
for (let index = session.messages.length - 1; index >= 0; index--) {
const message = session.messages[index];
if (message.role === "assistant") {
return extractMessageText(message);
}
}
return "";
}
export function sanitizeSessionKey(sessionKey: string): string {
return sessionKey.replace(/[^a-zA-Z0-9._-]/g, "_");
}
export function createGatewaySessionManager(cwd: string, sessionKey: string, sessionDirRoot: string): SessionManager {
return SessionManager.create(cwd, join(sessionDirRoot, sanitizeSessionKey(sessionKey)));
}

View file

@ -43,6 +43,26 @@ export interface MarkdownSettings {
codeBlockIndent?: string; // default: " "
}
export interface GatewaySessionSettings {
idleMinutes?: number;
maxQueuePerSession?: number;
}
export interface GatewayWebhookSettings {
enabled?: boolean;
basePath?: string;
secret?: string;
}
export interface GatewaySettings {
enabled?: boolean;
bind?: string;
port?: number;
bearerToken?: string;
session?: GatewaySessionSettings;
webhook?: GatewayWebhookSettings;
}
export type TransportSetting = Transport;
/**
@ -93,6 +113,7 @@ export interface Settings {
autocompleteMaxVisible?: number; // Max visible items in autocomplete dropdown (default: 5)
showHardwareCursor?: boolean; // Show terminal cursor while still positioning it for IME
markdown?: MarkdownSettings;
gateway?: GatewaySettings;
}
/** Deep merge settings: project/overrides take precedence, nested objects merge recursively */
@ -912,4 +933,8 @@ export class SettingsManager {
getCodeBlockIndent(): string {
return this.settings.markdown?.codeBlockIndent ?? " ";
}
getGatewaySettings(): GatewaySettings {
return structuredClone(this.settings.gateway ?? {});
}
}

View file

@ -140,6 +140,19 @@ export {
} from "./core/extensions/index.js";
// Footer data provider (git branch + extension statuses - data not otherwise available to extensions)
export type { ReadonlyFooterDataProvider } from "./core/footer-data-provider.js";
export {
createGatewaySessionManager,
type GatewayConfig,
type GatewayMessageRequest,
type GatewayMessageResult,
GatewayRuntime,
type GatewayRuntimeOptions,
type GatewaySessionFactory,
type GatewaySessionSnapshot,
getActiveGatewayRuntime,
sanitizeSessionKey,
setActiveGatewayRuntime,
} from "./core/gateway-runtime.js";
export { convertToLlm } from "./core/messages.js";
export { ModelRegistry } from "./core/model-registry.js";
export type {
@ -198,6 +211,7 @@ export {
} from "./core/session-manager.js";
export {
type CompactionSettings,
type GatewaySettings,
type ImageSettings,
type PackageSource,
type RetrySettings,

View file

@ -5,6 +5,7 @@
* createAgentSession() options. The SDK does the heavy lifting.
*/
import { join } from "node:path";
import { type ImageContent, modelsAreEqual, supportsXhigh } from "@mariozechner/pi-ai";
import chalk from "chalk";
import { createInterface } from "readline";
@ -17,6 +18,7 @@ import { APP_NAME, getAgentDir, getModelsPath, VERSION } from "./config.js";
import { AuthStorage } from "./core/auth-storage.js";
import { exportFromFile } from "./core/export-html/index.js";
import type { LoadExtensionsResult } from "./core/extensions/index.js";
import { createGatewaySessionManager } from "./core/gateway-runtime.js";
import { KeybindingsManager } from "./core/keybindings.js";
import { ModelRegistry } from "./core/model-registry.js";
import { resolveCliModel, resolveModelScope, type ScopedModel } from "./core/model-resolver.js";
@ -81,9 +83,10 @@ interface PackageCommandOptions {
function printDaemonHelp(): void {
console.log(`${chalk.bold("Usage:")}
${APP_NAME} gateway [options] [messages...]
${APP_NAME} daemon [options] [messages...]
Run pi as a long-lived daemon (non-interactive) with extensions enabled.
Run pi as a long-lived gateway (non-interactive) with extensions enabled.
Messages passed as positional args are sent once at startup.
Options:
@ -553,9 +556,9 @@ async function handleConfigCommand(args: string[]): Promise<boolean> {
}
export async function main(args: string[]) {
const isDaemonCommand = args[0] === "daemon";
const parsedArgs = isDaemonCommand ? args.slice(1) : args;
const offlineMode = args.includes("--offline") || isTruthyEnvFlag(process.env.PI_OFFLINE);
const isGatewayCommand = args[0] === "daemon" || args[0] === "gateway";
const parsedArgs = isGatewayCommand ? args.slice(1) : args;
const offlineMode = parsedArgs.includes("--offline") || isTruthyEnvFlag(process.env.PI_OFFLINE);
if (offlineMode) {
process.env.PI_OFFLINE = "1";
process.env.PI_SKIP_VERSION_CHECK = "1";
@ -634,7 +637,7 @@ export async function main(args: string[]) {
}
if (parsed.help) {
if (isDaemonCommand) {
if (isGatewayCommand) {
printDaemonHelp();
} else {
printHelp();
@ -648,13 +651,13 @@ export async function main(args: string[]) {
process.exit(0);
}
if (isDaemonCommand && parsed.mode === "rpc") {
console.error(chalk.red("Cannot use --mode rpc with the daemon command."));
if (isGatewayCommand && parsed.mode === "rpc") {
console.error(chalk.red("Cannot use --mode rpc with the gateway command."));
process.exit(1);
}
// Read piped stdin content (if any) - skip for daemon and RPC modes
if (!isDaemonCommand && parsed.mode !== "rpc") {
if (!isGatewayCommand && parsed.mode !== "rpc") {
const stdinContent = await readPipedStdin();
if (stdinContent !== undefined) {
// Force print mode since interactive mode requires a TTY for keyboard input
@ -684,7 +687,7 @@ export async function main(args: string[]) {
}
const { initialMessage, initialImages } = await prepareInitialMessage(parsed, settingsManager.getImageAutoResize());
const isInteractive = !isDaemonCommand && !parsed.print && parsed.mode === undefined;
const isInteractive = !isGatewayCommand && !parsed.print && parsed.mode === undefined;
const mode = parsed.mode || "text";
initTheme(settingsManager.getTheme(), isInteractive);
@ -789,11 +792,44 @@ export async function main(args: string[]) {
verbose: parsed.verbose,
});
await mode.run();
} else if (isDaemonCommand) {
} else if (isGatewayCommand) {
const gatewayLoaderOptions = {
additionalExtensionPaths: firstPass.extensions,
additionalSkillPaths: firstPass.skills,
additionalPromptTemplatePaths: firstPass.promptTemplates,
additionalThemePaths: firstPass.themes,
noExtensions: firstPass.noExtensions,
noSkills: firstPass.noSkills,
noPromptTemplates: firstPass.noPromptTemplates,
noThemes: firstPass.noThemes,
systemPrompt: firstPass.systemPrompt,
appendSystemPrompt: firstPass.appendSystemPrompt,
};
const gatewaySessionRoot = join(agentDir, "gateway-sessions");
const daemonOptions: DaemonModeOptions = {
initialMessage,
initialImages,
messages: parsed.messages,
gateway: settingsManager.getGatewaySettings(),
createSession: async (sessionKey) => {
const gatewayResourceLoader = new DefaultResourceLoader({
cwd,
agentDir,
settingsManager,
...gatewayLoaderOptions,
});
await gatewayResourceLoader.reload();
const gatewaySessionOptions: CreateAgentSessionOptions = {
...sessionOptions,
authStorage,
modelRegistry,
settingsManager,
resourceLoader: gatewayResourceLoader,
sessionManager: createGatewaySessionManager(cwd, sessionKey, gatewaySessionRoot),
};
const { session: gatewaySession } = await createAgentSession(gatewaySessionOptions);
return gatewaySession;
},
};
await runDaemonMode(session, daemonOptions);
} else {

View file

@ -8,6 +8,8 @@
import type { ImageContent } from "@mariozechner/pi-ai";
import type { AgentSession } from "../core/agent-session.js";
import { GatewayRuntime, type GatewaySessionFactory, setActiveGatewayRuntime } from "../core/gateway-runtime.js";
import type { GatewaySettings } from "../core/settings-manager.js";
/**
* Options for daemon mode.
@ -19,6 +21,10 @@ export interface DaemonModeOptions {
initialImages?: ImageContent[];
/** Additional startup messages (sent after initialMessage, one by one). */
messages?: string[];
/** Factory for creating additional gateway-owned sessions. */
createSession: GatewaySessionFactory;
/** Gateway config from settings/env. */
gateway: GatewaySettings;
}
function createCommandContextActions(session: AgentSession) {
@ -71,12 +77,39 @@ export async function runDaemonMode(session: AgentSession, options: DaemonModeOp
const ready = new Promise<void>((resolve) => {
resolveReady = resolve;
});
const gatewayBind = process.env.PI_GATEWAY_BIND ?? options.gateway.bind ?? "127.0.0.1";
const gatewayPort = Number.parseInt(process.env.PI_GATEWAY_PORT ?? "", 10) || options.gateway.port || 8787;
const gatewayToken = process.env.PI_GATEWAY_TOKEN ?? options.gateway.bearerToken;
const gateway = new GatewayRuntime({
config: {
bind: gatewayBind,
port: gatewayPort,
bearerToken: gatewayToken,
session: {
idleMinutes: options.gateway.session?.idleMinutes ?? 60,
maxQueuePerSession: options.gateway.session?.maxQueuePerSession ?? 8,
},
webhook: {
enabled: options.gateway.webhook?.enabled ?? true,
basePath: options.gateway.webhook?.basePath ?? "/webhooks",
secret: process.env.PI_GATEWAY_WEBHOOK_SECRET ?? options.gateway.webhook?.secret,
},
},
primarySessionKey: "web:main",
primarySession: session,
createSession: options.createSession,
log: (message) => {
console.error(`[pi-gateway] ${message}`);
},
});
const shutdown = async (reason: "signal" | "extension"): Promise<void> => {
if (isShuttingDown) return;
isShuttingDown = true;
console.error(`[co-mono-daemon] shutdown requested: ${reason}`);
console.error(`[pi-gateway] shutdown requested: ${reason}`);
setActiveGatewayRuntime(null);
await gateway.stop();
const runner = session.extensionRunner;
if (runner?.hasHandlers("session_shutdown")) {
@ -90,7 +123,7 @@ export async function runDaemonMode(session: AgentSession, options: DaemonModeOp
const handleShutdownSignal = (signal: NodeJS.Signals) => {
void shutdown("signal").catch((error) => {
console.error(
`[co-mono-daemon] shutdown failed for ${signal}: ${error instanceof Error ? error.message : String(error)}`,
`[pi-gateway] shutdown failed for ${signal}: ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
});
@ -102,7 +135,7 @@ export async function runDaemonMode(session: AgentSession, options: DaemonModeOp
process.once("SIGHUP", () => handleShutdownSignal("SIGHUP"));
process.on("unhandledRejection", (error) => {
console.error(`[co-mono-daemon] unhandled rejection: ${error instanceof Error ? error.message : String(error)}`);
console.error(`[pi-gateway] unhandled rejection: ${error instanceof Error ? error.message : String(error)}`);
});
await session.bindExtensions({
@ -110,7 +143,7 @@ export async function runDaemonMode(session: AgentSession, options: DaemonModeOp
shutdownHandler: () => {
void shutdown("extension").catch((error) => {
console.error(
`[co-mono-daemon] extension shutdown failed: ${error instanceof Error ? error.message : String(error)}`,
`[pi-gateway] extension shutdown failed: ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
});
@ -135,7 +168,11 @@ export async function runDaemonMode(session: AgentSession, options: DaemonModeOp
await session.prompt(message);
}
console.error(`[co-mono-daemon] startup complete (session=${session.sessionId ?? "unknown"})`);
await gateway.start();
setActiveGatewayRuntime(gateway);
console.error(
`[pi-gateway] startup complete (session=${session.sessionId ?? "unknown"}, bind=${gatewayBind}, port=${gatewayPort})`,
);
// Keep process alive forever.
const keepAlive = setInterval(() => {

View file

@ -2,6 +2,7 @@
"name": "@e9n/pi-channels",
"version": "0.1.0",
"description": "Two-way channel extension for pi — route messages between agents and Telegram, webhooks, and custom adapters",
"type": "module",
"keywords": [
"pi-package"
],

View file

@ -39,8 +39,8 @@
import { SocketModeClient } from "@slack/socket-mode";
import { WebClient } from "@slack/web-api";
import { getChannelSetting } from "../config.ts";
import type { AdapterConfig, ChannelAdapter, ChannelMessage, OnIncomingMessage } from "../types.ts";
import { getChannelSetting } from "../config.js";
import type { AdapterConfig, ChannelAdapter, ChannelMessage, OnIncomingMessage } from "../types.js";
const MAX_LENGTH = 3000; // Slack block text limit; actual API limit is 4000 but leave margin
@ -146,7 +146,7 @@ export function createSlackAdapter(config: AdapterConfig, cwd?: string, log?: Sl
return {
direction: "bidirectional" as const,
async sendTyping(recipient: string): Promise<void> {
async sendTyping(_recipient: string): Promise<void> {
// Slack doesn't have a direct "typing" API for bots in channels.
// We can use a reaction or simply no-op. For DMs, there's no API either.
// Best we can do is nothing — Slack bots don't show typing indicators.
@ -309,7 +309,7 @@ export function createSlackAdapter(config: AdapterConfig, cwd?: string, log?: Sl
);
// ── Interactive payloads (future: button clicks, modals) ──
socketClient.on("interactive", async ({ body, ack }: { body: any; ack: () => Promise<void> }) => {
socketClient.on("interactive", async ({ body: _body, ack }: { body: any; ack: () => Promise<void> }) => {
try {
await ack();
// TODO: handle interactive payloads (block actions, modals)

View file

@ -36,8 +36,8 @@ import type {
IncomingMessage,
OnIncomingMessage,
TranscriptionConfig,
} from "../types.ts";
import { createTranscriptionProvider, type TranscriptionProvider } from "./transcription.ts";
} from "../types.js";
import { createTranscriptionProvider, type TranscriptionProvider } from "./transcription.js";
const MAX_LENGTH = 4096;
const MAX_FILE_SIZE = 1_048_576; // 1MB
@ -388,7 +388,6 @@ export function createTelegramAdapter(config: AdapterConfig): ChannelAdapter {
};
}
const ext = path.extname(filename || "").toLowerCase();
const attachment: IncomingAttachment = {
type: "image",
path: downloaded.localPath,
@ -472,7 +471,7 @@ export function createTelegramAdapter(config: AdapterConfig): ChannelAdapter {
return {
adapter: "telegram",
sender: chatId,
text: `🎵 ${filename || "audio"} (transcription failed${result.error ? ": " + result.error : ""})`,
text: `🎵 ${filename || "audio"} (transcription failed${result.error ? `: ${result.error}` : ""})`,
metadata: { ...metadata, hasAudio: true },
};
}
@ -535,7 +534,7 @@ export function createTelegramAdapter(config: AdapterConfig): ChannelAdapter {
return {
adapter: "telegram",
sender: chatId,
text: `🎤 (voice message — transcription failed${result.error ? ": " + result.error : ""})`,
text: `🎤 (voice message — transcription failed${result.error ? `: ${result.error}` : ""})`,
metadata: { ...metadata, hasVoice: true, voiceDuration: voice.duration },
};
}
@ -588,7 +587,7 @@ export function createTelegramAdapter(config: AdapterConfig): ChannelAdapter {
return {
adapter: "telegram",
sender: chatId,
text: `🎵 ${audioName} (transcription failed${result.error ? ": " + result.error : ""})`,
text: `🎵 ${audioName} (transcription failed${result.error ? `: ${result.error}` : ""})`,
metadata: { ...metadata, hasAudio: true, audioTitle: audio.title, audioDuration: audio.duration },
};
}

View file

@ -14,7 +14,7 @@
import { execFile } from "node:child_process";
import * as fs from "node:fs";
import * as path from "node:path";
import type { TranscriptionConfig } from "../types.ts";
import type { TranscriptionConfig } from "../types.js";
// ── Public interface ────────────────────────────────────────────

View file

@ -11,7 +11,7 @@
* }
*/
import type { AdapterConfig, ChannelAdapter, ChannelMessage } from "../types.ts";
import type { AdapterConfig, ChannelAdapter, ChannelMessage } from "../types.js";
export function createWebhookAdapter(config: AdapterConfig): ChannelAdapter {
const method = (config.method as string) ?? "POST";

View file

@ -2,18 +2,18 @@
* pi-channels Chat bridge.
*
* Listens for incoming messages (channel:receive), serializes per sender,
* runs prompts via isolated subprocesses, and sends responses back via
* the same adapter. Each sender gets their own FIFO queue. Multiple
* senders run concurrently up to maxConcurrent.
* routes prompts into the live pi gateway runtime, and sends responses
* back via the same adapter. Each sender gets their own FIFO queue.
* Multiple senders run concurrently up to maxConcurrent.
*/
import type { EventBus } from "@mariozechner/pi-coding-agent";
import type { ChannelRegistry } from "../registry.ts";
import type { BridgeConfig, IncomingAttachment, IncomingMessage, QueuedPrompt, SenderSession } from "../types.ts";
import { type CommandContext, handleCommand, isCommand } from "./commands.ts";
import { RpcSessionManager } from "./rpc-runner.ts";
import { runPrompt } from "./runner.ts";
import { startTyping } from "./typing.ts";
import { readFileSync } from "node:fs";
import type { ImageContent } from "@mariozechner/pi-ai";
import { type EventBus, getActiveGatewayRuntime } from "@mariozechner/pi-coding-agent";
import type { ChannelRegistry } from "../registry.js";
import type { BridgeConfig, IncomingMessage, QueuedPrompt, SenderSession } from "../types.js";
import { type CommandContext, handleCommand, isCommand } from "./commands.js";
import { startTyping } from "./typing.js";
const BRIDGE_DEFAULTS: Required<BridgeConfig> = {
enabled: false,
@ -38,24 +38,21 @@ function nextId(): string {
export class ChatBridge {
private config: Required<BridgeConfig>;
private cwd: string;
private registry: ChannelRegistry;
private events: EventBus;
private log: LogFn;
private sessions = new Map<string, SenderSession>();
private activeCount = 0;
private running = false;
private rpcManager: RpcSessionManager | null = null;
constructor(
bridgeConfig: BridgeConfig | undefined,
cwd: string,
_cwd: string,
registry: ChannelRegistry,
events: EventBus,
log: LogFn = () => {},
) {
this.config = { ...BRIDGE_DEFAULTS, ...bridgeConfig };
this.cwd = cwd;
this.registry = registry;
this.events = events;
this.log = log;
@ -65,18 +62,11 @@ export class ChatBridge {
start(): void {
if (this.running) return;
if (!getActiveGatewayRuntime()) {
this.log("bridge-unavailable", { reason: "no active pi gateway runtime" }, "WARN");
return;
}
this.running = true;
// Always create the RPC manager — it's used on-demand for persistent senders
this.rpcManager = new RpcSessionManager(
{
cwd: this.cwd,
model: this.config.model,
timeoutMs: this.config.timeoutMs,
extensions: this.config.extensions,
},
this.config.idleTimeoutMinutes * 60_000,
);
}
stop(): void {
@ -86,8 +76,6 @@ export class ChatBridge {
}
this.sessions.clear();
this.activeCount = 0;
this.rpcManager?.killAll();
this.rpcManager = null;
}
isActive(): boolean {
@ -180,38 +168,32 @@ export class ChatBridge {
// Typing indicator
const adapter = this.registry.getAdapter(prompt.adapter);
const typing = this.config.typingIndicators ? startTyping(adapter, prompt.sender) : { stop() {} };
const ac = new AbortController();
session.abortController = ac;
const usePersistent = this.shouldUsePersistent(senderKey);
const gateway = getActiveGatewayRuntime();
if (!gateway) {
typing.stop();
session.processing = false;
this.activeCount--;
this.sendReply(prompt.adapter, prompt.sender, "❌ pi gateway is not running.");
return;
}
this.events.emit("bridge:start", {
id: prompt.id,
adapter: prompt.adapter,
sender: prompt.sender,
text: prompt.text.slice(0, 100),
persistent: usePersistent,
persistent: true,
});
try {
let result;
if (usePersistent && this.rpcManager) {
// Persistent mode: use RPC session
result = await this.runWithRpc(senderKey, prompt, ac.signal);
} else {
// Stateless mode: spawn subprocess
result = await runPrompt({
prompt: prompt.text,
cwd: this.cwd,
timeoutMs: this.config.timeoutMs,
model: this.config.model,
signal: ac.signal,
attachments: prompt.attachments,
extensions: this.config.extensions,
});
}
session.abortController = new AbortController();
const result = await gateway.enqueueMessage({
sessionKey: senderKey,
text: buildPromptText(prompt),
images: collectImageAttachments(prompt.attachments),
source: "extension",
metadata: prompt.metadata,
});
typing.stop();
@ -229,8 +211,7 @@ export class ChatBridge {
adapter: prompt.adapter,
sender: prompt.sender,
ok: result.ok,
durationMs: result.durationMs,
persistent: usePersistent,
persistent: true,
});
this.log(
"bridge-complete",
@ -238,15 +219,15 @@ export class ChatBridge {
id: prompt.id,
adapter: prompt.adapter,
ok: result.ok,
durationMs: result.durationMs,
persistent: usePersistent,
persistent: true,
},
result.ok ? "INFO" : "WARN",
);
} catch (err: any) {
} catch (err: unknown) {
typing.stop();
this.log("bridge-error", { adapter: prompt.adapter, sender: prompt.sender, error: err.message }, "ERROR");
this.sendReply(prompt.adapter, prompt.sender, `❌ Unexpected error: ${err.message}`);
const message = err instanceof Error ? err.message : String(err);
this.log("bridge-error", { adapter: prompt.adapter, sender: prompt.sender, error: message }, "ERROR");
this.sendReply(prompt.adapter, prompt.sender, `❌ Unexpected error: ${message}`);
} finally {
session.abortController = null;
session.processing = false;
@ -257,29 +238,6 @@ export class ChatBridge {
}
}
/** Run a prompt via persistent RPC session. */
private async runWithRpc(
senderKey: string,
prompt: QueuedPrompt,
signal?: AbortSignal,
): Promise<import("../types.ts").RunResult> {
try {
const rpcSession = await this.rpcManager!.getSession(senderKey);
return await rpcSession.runPrompt(prompt.text, {
signal,
attachments: prompt.attachments,
});
} catch (err: any) {
return {
ok: false,
response: "",
error: err.message,
durationMs: 0,
exitCode: 1,
};
}
}
/** After a slot frees up, check other senders waiting for concurrency. */
private drainWaiting(): void {
if (this.activeCount >= this.config.maxConcurrent) return;
@ -327,37 +285,17 @@ export class ChatBridge {
return this.sessions;
}
// ── Session mode resolution ───────────────────────────────
/**
* Determine if a sender should use persistent (RPC) or stateless mode.
* Checks sessionRules first (first match wins), falls back to sessionMode default.
*/
private shouldUsePersistent(senderKey: string): boolean {
for (const rule of this.config.sessionRules) {
if (globMatch(rule.match, senderKey)) {
return rule.mode === "persistent";
}
}
return this.config.sessionMode === "persistent";
}
// ── Command context ───────────────────────────────────────
private commandContext(): CommandContext {
const gateway = getActiveGatewayRuntime();
return {
isPersistent: (sender: string) => {
// Find the sender key to check mode
for (const [key, session] of this.sessions) {
if (session.sender === sender) return this.shouldUsePersistent(key);
}
return this.config.sessionMode === "persistent";
},
isPersistent: () => true,
abortCurrent: (sender: string): boolean => {
for (const session of this.sessions.values()) {
if (!gateway) return false;
for (const [key, session] of this.sessions) {
if (session.sender === sender && session.abortController) {
session.abortController.abort();
return true;
return gateway.abortSession(key);
}
}
return false;
@ -368,13 +306,11 @@ export class ChatBridge {
}
},
resetSession: (sender: string): void => {
if (!gateway) return;
for (const [key, session] of this.sessions) {
if (session.sender === sender) {
this.sessions.delete(key);
// Also reset persistent RPC session
if (this.rpcManager) {
this.rpcManager.resetSession(key).catch(() => {});
}
void gateway.resetSession(key);
}
}
},
@ -388,21 +324,6 @@ export class ChatBridge {
}
}
// ── Helpers ───────────────────────────────────────────────────
/**
* Simple glob matcher supporting `*` (any chars) and `?` (single char).
* Used for sessionRules pattern matching against "adapter:senderId" keys.
*/
function globMatch(pattern: string, text: string): boolean {
// Escape regex special chars except * and ?
const re = pattern
.replace(/[.+^${}()|[\]\\]/g, "\\$&")
.replace(/\*/g, ".*")
.replace(/\?/g, ".");
return new RegExp(`^${re}$`).test(text);
}
const MAX_ERROR_LENGTH = 200;
/**
@ -428,5 +349,36 @@ function sanitizeError(error: string | undefined): string {
const msg = meaningful?.trim() || "Something went wrong. Please try again.";
return msg.length > MAX_ERROR_LENGTH ? msg.slice(0, MAX_ERROR_LENGTH) + "…" : msg;
return msg.length > MAX_ERROR_LENGTH ? `${msg.slice(0, MAX_ERROR_LENGTH)}` : msg;
}
function collectImageAttachments(attachments: QueuedPrompt["attachments"]): ImageContent[] | undefined {
if (!attachments || attachments.length === 0) {
return undefined;
}
const images = attachments
.filter((attachment) => attachment.type === "image")
.map((attachment) => ({
type: "image" as const,
data: readFileSync(attachment.path).toString("base64"),
mimeType: attachment.mimeType || "image/jpeg",
}));
return images.length > 0 ? images : undefined;
}
function buildPromptText(prompt: QueuedPrompt): string {
if (!prompt.attachments || prompt.attachments.length === 0) {
return prompt.text;
}
const attachmentNotes = prompt.attachments
.filter((attachment) => attachment.type !== "image")
.map((attachment) => {
const label = attachment.filename ?? attachment.path;
return `Attachment (${attachment.type}): ${label}`;
});
if (attachmentNotes.length === 0) {
return prompt.text;
}
return `${prompt.text}\n\n${attachmentNotes.join("\n")}`;
}

View file

@ -7,7 +7,7 @@
* Built-in: /start, /help, /abort, /status, /new
*/
import type { SenderSession } from "../types.ts";
import type { SenderSession } from "../types.js";
export interface BotCommand {
name: string;

View file

@ -14,7 +14,7 @@
import { type ChildProcess, spawn } from "node:child_process";
import * as readline from "node:readline";
import type { IncomingAttachment, RunResult } from "../types.ts";
import type { IncomingAttachment, RunResult } from "../types.js";
export interface RpcRunnerOptions {
cwd: string;
@ -118,95 +118,97 @@ export class RpcSession {
onStreaming?: (text: string) => void;
},
): Promise<RunResult> {
return new Promise(async (resolve) => {
// Ensure subprocess is running
if (!this.ready) {
const ok = await this.start();
if (!ok) {
resolve({
ok: false,
response: "",
error: "Failed to start RPC session",
durationMs: 0,
exitCode: 1,
});
return;
}
}
const startTime = Date.now();
this._onStreaming = options?.onStreaming ?? null;
// Timeout
const timer = setTimeout(() => {
if (this.pending) {
const p = this.pending;
this.pending = null;
const text = p.textChunks.join("");
p.resolve({
ok: false,
response: text || "(timed out)",
error: "Timeout",
durationMs: Date.now() - p.startTime,
exitCode: 124,
});
// Kill and restart on next message
this.cleanup();
}
}, this.options.timeoutMs);
this.pending = { resolve, startTime, timer, textChunks: [] };
// Abort handler
const onAbort = () => {
this.sendCommand({ type: "abort" });
};
if (options?.signal) {
if (options.signal.aborted) {
clearTimeout(timer);
this.pending = null;
this.sendCommand({ type: "abort" });
resolve({
ok: false,
response: "(aborted)",
error: "Aborted by user",
durationMs: Date.now() - startTime,
exitCode: 130,
});
return;
}
options.signal.addEventListener("abort", onAbort, { once: true });
this.pending.abortHandler = () => options.signal?.removeEventListener("abort", onAbort);
}
// Build prompt command
const cmd: Record<string, unknown> = {
type: "prompt",
message: prompt,
};
// Attach images as base64
if (options?.attachments?.length) {
const images: Array<Record<string, string>> = [];
for (const att of options.attachments) {
if (att.type === "image") {
try {
const fs = await import("node:fs");
const data = fs.readFileSync(att.path).toString("base64");
images.push({
type: "image",
data,
mimeType: att.mimeType || "image/jpeg",
});
} catch {
// Skip unreadable attachments
}
return new Promise((resolve) => {
void (async () => {
// Ensure subprocess is running
if (!this.ready) {
const ok = await this.start();
if (!ok) {
resolve({
ok: false,
response: "",
error: "Failed to start RPC session",
durationMs: 0,
exitCode: 1,
});
return;
}
}
if (images.length > 0) cmd.images = images;
}
this.sendCommand(cmd);
const startTime = Date.now();
this._onStreaming = options?.onStreaming ?? null;
// Timeout
const timer = setTimeout(() => {
if (this.pending) {
const p = this.pending;
this.pending = null;
const text = p.textChunks.join("");
p.resolve({
ok: false,
response: text || "(timed out)",
error: "Timeout",
durationMs: Date.now() - p.startTime,
exitCode: 124,
});
// Kill and restart on next message
this.cleanup();
}
}, this.options.timeoutMs);
this.pending = { resolve, startTime, timer, textChunks: [] };
// Abort handler
const onAbort = () => {
this.sendCommand({ type: "abort" });
};
if (options?.signal) {
if (options.signal.aborted) {
clearTimeout(timer);
this.pending = null;
this.sendCommand({ type: "abort" });
resolve({
ok: false,
response: "(aborted)",
error: "Aborted by user",
durationMs: Date.now() - startTime,
exitCode: 130,
});
return;
}
options.signal.addEventListener("abort", onAbort, { once: true });
this.pending.abortHandler = () => options.signal?.removeEventListener("abort", onAbort);
}
// Build prompt command
const cmd: Record<string, unknown> = {
type: "prompt",
message: prompt,
};
// Attach images as base64
if (options?.attachments?.length) {
const images: Array<Record<string, string>> = [];
for (const att of options.attachments) {
if (att.type === "image") {
try {
const fs = await import("node:fs");
const data = fs.readFileSync(att.path).toString("base64");
images.push({
type: "image",
data,
mimeType: att.mimeType || "image/jpeg",
});
} catch {
// Skip unreadable attachments
}
}
}
if (images.length > 0) cmd.images = images;
}
this.sendCommand(cmd);
})();
});
}
@ -253,7 +255,7 @@ export class RpcSession {
private sendCommand(cmd: Record<string, unknown>): void {
if (!this.child?.stdin?.writable) return;
this.child.stdin.write(JSON.stringify(cmd) + "\n");
this.child.stdin.write(`${JSON.stringify(cmd)}\n`);
}
private handleLine(line: string): void {
@ -358,7 +360,7 @@ export class RpcSessionManager {
/** Get or create a session for a sender. */
async getSession(senderKey: string): Promise<RpcSession> {
let session = this.sessions.get(senderKey);
if (session && session.isAlive()) {
if (session?.isAlive()) {
this.resetIdleTimer(senderKey);
return session;
}
@ -403,7 +405,7 @@ export class RpcSessionManager {
/** Kill all sessions. */
killAll(): void {
for (const [key, session] of this.sessions) {
for (const session of this.sessions.values()) {
session.cleanup();
}
this.sessions.clear();

View file

@ -7,7 +7,7 @@
*/
import { type ChildProcess, spawn } from "node:child_process";
import type { IncomingAttachment, RunResult } from "../types.ts";
import type { IncomingAttachment, RunResult } from "../types.js";
export interface RunOptions {
prompt: string;

View file

@ -6,7 +6,7 @@
* For adapters without sendTyping, this is a no-op.
*/
import type { ChannelAdapter } from "../types.ts";
import type { ChannelAdapter } from "../types.js";
const TYPING_INTERVAL_MS = 4_000;

View file

@ -29,7 +29,7 @@
*/
import { getAgentDir, SettingsManager } from "@mariozechner/pi-coding-agent";
import type { ChannelConfig } from "./types.ts";
import type { ChannelConfig } from "./types.js";
const SETTINGS_KEY = "pi-channels";

View file

@ -15,9 +15,9 @@
*/
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import type { ChatBridge } from "./bridge/bridge.ts";
import type { ChannelRegistry } from "./registry.ts";
import type { ChannelAdapter, ChannelMessage, IncomingMessage } from "./types.ts";
import type { ChatBridge } from "./bridge/bridge.js";
import type { ChannelRegistry } from "./registry.js";
import type { ChannelAdapter, ChannelMessage, IncomingMessage } from "./types.js";
/** Reference to the active bridge, set by index.ts after construction. */
let activeBridge: ChatBridge | null = null;

View file

@ -35,12 +35,12 @@
*/
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { ChatBridge } from "./bridge/bridge.ts";
import { loadConfig } from "./config.ts";
import { registerChannelEvents, setBridge } from "./events.ts";
import { createLogger } from "./logger.ts";
import { ChannelRegistry } from "./registry.ts";
import { registerChannelTool } from "./tool.ts";
import { ChatBridge } from "./bridge/bridge.js";
import { loadConfig } from "./config.js";
import { registerChannelEvents, setBridge } from "./events.js";
import { createLogger } from "./logger.js";
import { ChannelRegistry } from "./registry.js";
import { registerChannelTool } from "./tool.js";
export default function (pi: ExtensionAPI) {
const log = createLogger(pi);

View file

@ -2,9 +2,9 @@
* pi-channels Adapter registry + route resolution.
*/
import { createSlackAdapter } from "./adapters/slack.ts";
import { createTelegramAdapter } from "./adapters/telegram.ts";
import { createWebhookAdapter } from "./adapters/webhook.ts";
import { createSlackAdapter } from "./adapters/slack.js";
import { createTelegramAdapter } from "./adapters/telegram.js";
import { createWebhookAdapter } from "./adapters/webhook.js";
import type {
AdapterConfig,
AdapterDirection,
@ -13,7 +13,7 @@ import type {
ChannelMessage,
IncomingMessage,
OnIncomingMessage,
} from "./types.ts";
} from "./types.js";
// ── Built-in adapter factories ──────────────────────────────────

View file

@ -5,7 +5,7 @@
import { StringEnum } from "@mariozechner/pi-ai";
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import { Type } from "@sinclair/typebox";
import type { ChannelRegistry } from "./registry.ts";
import type { ChannelRegistry } from "./registry.js";
interface ChannelToolParams {
action: "send" | "list" | "test";

View file

@ -6,7 +6,7 @@
*/
import { spawnSync } from "node:child_process";
import { execCommand, type SpawnOptions, type TerminalAdapter } from "../utils/terminal-adapter";
import type { SpawnOptions, TerminalAdapter } from "../utils/terminal-adapter";
/**
* Context needed for iTerm2 spawning (tracks last pane for layout)

View file

@ -39,7 +39,7 @@ describe("WezTermAdapter", () => {
describe("spawn", () => {
it("should spawn first pane to the right with 50%", () => {
// Mock getPanes finding only current pane
mockExecCommand.mockImplementation((bin, args) => {
mockExecCommand.mockImplementation((_bin: string, args: string[]) => {
if (args.includes("list")) {
return {
stdout: JSON.stringify([{ pane_id: 0, tab_id: 0 }]),
@ -69,7 +69,7 @@ describe("WezTermAdapter", () => {
it("should spawn subsequent panes by splitting the sidebar", () => {
// Mock getPanes finding current pane (0) and sidebar pane (1)
mockExecCommand.mockImplementation((bin, args) => {
mockExecCommand.mockImplementation((_bin: string, args: string[]) => {
if (args.includes("list")) {
return {
stdout: JSON.stringify([

View file

@ -1,13 +1,12 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { withLock } from "./lock";
describe("withLock race conditions", () => {
const testDir = path.join(os.tmpdir(), "pi-lock-race-test-" + Date.now());
const testDir = path.join(os.tmpdir(), `pi-lock-race-test-${Date.now()}`);
const lockPath = path.join(testDir, "test");
const lockFile = `${lockPath}.lock`;
beforeEach(() => {
if (!fs.existsSync(testDir)) fs.mkdirSync(testDir, { recursive: true });

View file

@ -7,7 +7,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { withLock } from "./lock";
describe("withLock", () => {
const testDir = path.join(os.tmpdir(), "pi-lock-test-" + Date.now());
const testDir = path.join(os.tmpdir(), `pi-lock-test-${Date.now()}`);
const lockPath = path.join(testDir, "test");
const lockFile = `${lockPath}.lock`;

View file

@ -1,8 +1,6 @@
// Project: pi-teams
import fs from "node:fs";
import path from "node:path";
const LOCK_TIMEOUT = 5000; // 5 seconds of retrying
const STALE_LOCK_TIMEOUT = 30000; // 30 seconds for a lock to be considered stale
export async function withLock<T>(lockPath: string, fn: () => Promise<T>, retries: number = 50): Promise<T> {
@ -18,7 +16,7 @@ export async function withLock<T>(lockPath: string, fn: () => Promise<T>, retrie
// Attempt to remove stale lock
try {
fs.unlinkSync(lockFile);
} catch (e) {
} catch (_error) {
// ignore, another process might have already removed it
}
}
@ -26,7 +24,7 @@ export async function withLock<T>(lockPath: string, fn: () => Promise<T>, retrie
fs.writeFileSync(lockFile, process.pid.toString(), { flag: "wx" });
break;
} catch (e) {
} catch (_error) {
retries--;
await new Promise((resolve) => setTimeout(resolve, 100));
}
@ -41,7 +39,7 @@ export async function withLock<T>(lockPath: string, fn: () => Promise<T>, retrie
} finally {
try {
fs.unlinkSync(lockFile);
} catch (e) {
} catch (_error) {
// ignore
}
}

View file

@ -6,7 +6,7 @@ import { appendMessage, broadcastMessage, readInbox, sendPlainMessage } from "./
import * as paths from "./paths";
// Mock the paths to use a temporary directory
const testDir = path.join(os.tmpdir(), "pi-teams-test-" + Date.now());
const testDir = path.join(os.tmpdir(), `pi-teams-test-${Date.now()}`);
describe("Messaging Utilities", () => {
beforeEach(() => {
@ -14,11 +14,11 @@ describe("Messaging Utilities", () => {
fs.mkdirSync(testDir, { recursive: true });
// Override paths to use testDir
vi.spyOn(paths, "inboxPath").mockImplementation((teamName, agentName) => {
vi.spyOn(paths, "inboxPath").mockImplementation((_teamName, agentName) => {
return path.join(testDir, "inboxes", `${agentName}.json`);
});
vi.spyOn(paths, "teamDir").mockReturnValue(testDir);
vi.spyOn(paths, "configPath").mockImplementation((teamName) => {
vi.spyOn(paths, "configPath").mockImplementation((_teamName) => {
return path.join(testDir, "config.json");
});
});

View file

@ -103,6 +103,8 @@ export async function broadcastMessage(
if (failures.length > 0) {
console.error(`Broadcast partially failed: ${failures.length} messages could not be delivered.`);
// Optionally log individual errors
failures.forEach((f) => console.error(`- Delivery error:`, f.reason));
for (const failure of failures) {
console.error("- Delivery error:", failure.reason);
}
}
}

View file

@ -1,6 +1,3 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { inboxPath, sanitizeName, teamDir } from "./paths";
@ -17,7 +14,6 @@ describe("Security Audit - Path Traversal (Prevention Check)", () => {
});
it("should throw an error for path traversal via taskId", () => {
const teamName = "audit-team";
const maliciousTaskId = "../../../etc/passwd";
// We need to import readTask/updateTask or just sanitizeName directly if we want to test the logic
// But since we already tested sanitizeName via other paths, this is just for completeness.

View file

@ -3,9 +3,9 @@ import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import * as paths from "./paths";
import { createTask, listTasks } from "./tasks";
import { createTask } from "./tasks";
const testDir = path.join(os.tmpdir(), "pi-tasks-race-test-" + Date.now());
const testDir = path.join(os.tmpdir(), `pi-tasks-race-test-${Date.now()}`);
describe("Tasks Race Condition Bug", () => {
beforeEach(() => {

View file

@ -6,10 +6,9 @@ import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import * as paths from "./paths";
import { createTask, evaluatePlan, listTasks, readTask, submitPlan, updateTask } from "./tasks";
import * as teams from "./teams";
// Mock the paths to use a temporary directory
const testDir = path.join(os.tmpdir(), "pi-teams-test-" + Date.now());
const testDir = path.join(os.tmpdir(), `pi-teams-test-${Date.now()}`);
describe("Tasks Utilities", () => {
beforeEach(() => {

View file

@ -10,7 +10,7 @@ import { teamExists } from "./teams";
export function getTaskId(teamName: string): string {
const dir = taskDir(teamName);
const files = fs.readdirSync(dir).filter((f) => f.endsWith(".json"));
const ids = files.map((f) => parseInt(path.parse(f).name, 10)).filter((id) => !isNaN(id));
const ids = files.map((f) => parseInt(path.parse(f).name, 10)).filter((id) => !Number.isNaN(id));
return ids.length > 0 ? (Math.max(...ids) + 1).toString() : "1";
}
@ -169,7 +169,7 @@ export async function listTasks(teamName: string): Promise<TaskFile[]> {
const tasks: TaskFile[] = files
.map((f) => {
const id = parseInt(path.parse(f).name, 10);
if (isNaN(id)) return null;
if (Number.isNaN(id)) return null;
return JSON.parse(fs.readFileSync(path.join(dir, f), "utf-8"));
})
.filter((t) => t !== null);

View file

@ -1,5 +1,4 @@
import fs from "node:fs";
import path from "node:path";
import { withLock } from "./lock";
import type { Member, TeamConfig } from "./models";
import { configPath, taskDir, teamDir } from "./paths";