mirror of
https://github.com/harivansh-afk/pi-telegram-webhook.git
synced 2026-04-15 05:02:11 +00:00
initial: webhook telegram adapter for pi with streaming replies
- webhook server with secret validation, rate limiting, body guards - streaming replies via sendMessage + editMessageText throttled loop - RPC session management for persistent conversations - 15/15 tests passing
This commit is contained in:
parent
809e9b1df5
commit
ce9abc2a8e
18 changed files with 6991 additions and 1 deletions
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
node_modules/\ndist/\n.pi/
|
||||
30
.pi/agents/worker.md
Normal file
30
.pi/agents/worker.md
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
---
|
||||
name: worker
|
||||
description: Build pi-telegram-webhook extension
|
||||
tools: read, bash, edit, write
|
||||
model: anthropic/claude-sonnet-4-5
|
||||
thinking: xhigh
|
||||
---
|
||||
|
||||
You are building a new pi extension package called pi-telegram-webhook.
|
||||
|
||||
You have access to these reference codebases:
|
||||
- OpenClaw telegram extension (webhook + streaming): ~/Documents/GitHub/nix/tmp/openclaw/extensions/telegram/
|
||||
- Pi mono repo (pi SDK, agent core, TUI): ~/Documents/GitHub/nix/tmp/pi-mono/
|
||||
- Pi channels (existing telegram polling adapter): ~/.local/share/npm/lib/node_modules/@e9n/pi-channels/
|
||||
- Pi coding agent (extension API): ~/.local/share/npm/lib/node_modules/@mariozechner/pi-coding-agent/
|
||||
|
||||
Follow pi extension/package conventions from the pi mono repo. This is a pi package installable via `pi install npm:pi-telegram-webhook`.
|
||||
|
||||
Key requirements:
|
||||
- Telegram webhook ingress (HTTP server, setWebhook, secret token validation, rate limiting)
|
||||
- Streaming message delivery (sendMessage then editMessageText on throttled loop)
|
||||
- Clean, tested, concise TypeScript
|
||||
- No grammy dependency — use raw Telegram Bot API via fetch like pi-channels does
|
||||
- Must work behind nginx reverse proxy (support X-Forwarded-For, trusted proxies)
|
||||
- Health check endpoint
|
||||
- Graceful shutdown (deleteWebhook on stop)
|
||||
- Configurable via pi settings.json under "pi-telegram-webhook" key
|
||||
- Follow the same adapter/bridge pattern as pi-channels for compatibility
|
||||
|
||||
Do NOT use markdown formatting in any user-facing output. Write code only.
|
||||
22
.pi/task.md
Normal file
22
.pi/task.md
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
Build the pi-telegram-webhook extension. Read these references first:
|
||||
|
||||
1. ~/Documents/GitHub/nix/tmp/openclaw/extensions/telegram/src/webhook.ts
|
||||
2. ~/Documents/GitHub/nix/tmp/openclaw/extensions/telegram/src/draft-stream.ts
|
||||
3. ~/.local/share/npm/lib/node_modules/@e9n/pi-channels/src/index.ts
|
||||
4. ~/.local/share/npm/lib/node_modules/@e9n/pi-channels/src/adapters/telegram.ts
|
||||
5. ~/.local/share/npm/lib/node_modules/@e9n/pi-channels/src/bridge/bridge.ts
|
||||
6. ~/.local/share/npm/lib/node_modules/@e9n/pi-channels/src/bridge/rpc-runner.ts
|
||||
7. ~/Documents/GitHub/nix/tmp/pi-mono/packages/coding-agent/package.json
|
||||
|
||||
Then build a focused pi extension package in this repo with:
|
||||
- package.json with pi.extensions entry for npm publish
|
||||
- src/index.ts (pi extension entry via ExtensionAPI)
|
||||
- src/webhook-server.ts (HTTP server, secret validation, rate limiting, health endpoint, trusted proxy)
|
||||
- src/streaming-reply.ts (sendMessage + editMessageText throttled loop, 4096 cap, generation tracking)
|
||||
- src/telegram-api.ts (thin fetch wrapper for Telegram Bot API)
|
||||
- src/bridge.ts (incoming message queue, RPC session management, connect webhook to agent)
|
||||
- src/types.ts and src/config.ts
|
||||
- tests/ with vitest
|
||||
- tsconfig.json, vitest.config.ts, README.md
|
||||
|
||||
Config key: "pi-telegram-webhook" in settings.json. No grammy. Raw fetch only. Concise.
|
||||
162
README.md
162
README.md
|
|
@ -1,2 +1,162 @@
|
|||
# pi-telegram-webhook
|
||||
Pi extension for Telegram webhook ingress and streaming message delivery
|
||||
|
||||
Webhook-based Telegram adapter for [pi](https://github.com/badlogic/pi-mono) with streaming replies and persistent RPC sessions.
|
||||
|
||||
## Features
|
||||
|
||||
- **Webhook ingress** (vs polling) — lower latency, better for production
|
||||
- **Security hardened**:
|
||||
- Constant-time secret validation (timing attack resistant)
|
||||
- Fixed-window rate limiter per IP
|
||||
- 1MB body size limit
|
||||
- 30s read timeout
|
||||
- Trusted proxy X-Forwarded-For support
|
||||
- **Streaming replies** — throttled `editMessageText` for live updates (default 1s, 4096 char limit)
|
||||
- **Persistent RPC sessions** — `pi --mode rpc` subprocess per sender, context preserved across messages
|
||||
- **Generation tracking** — prevents new messages from clobbering old edits
|
||||
- **FIFO queues** — per-sender serialization, configurable concurrency
|
||||
- **Bot commands**: `/start`, `/help`, `/new`, `/abort`, `/clear`
|
||||
|
||||
## Installation
|
||||
|
||||
```bash
|
||||
npm install pi-telegram-webhook
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
Add to `.pi/settings.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"pi-telegram-webhook": {
|
||||
"botToken": "your-telegram-bot-token",
|
||||
"webhookUrl": "https://your-domain.com/telegram-webhook",
|
||||
"webhookPort": 2470,
|
||||
"webhookHost": "127.0.0.1",
|
||||
"webhookPath": "/telegram-webhook",
|
||||
"webhookSecret": "random-secret-token",
|
||||
"allowedChatIds": ["123456789"],
|
||||
"streamingThrottleMs": 1000,
|
||||
"minInitialChars": 50,
|
||||
"trustedProxies": ["127.0.0.1", "10.0.0.0/8"],
|
||||
"maxConcurrent": 2,
|
||||
"timeoutMs": 300000,
|
||||
"idleTimeoutMinutes": 30,
|
||||
"model": "anthropic/claude-sonnet-4",
|
||||
"extensions": ["pi-channels"]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Config Reference
|
||||
|
||||
| Key | Type | Default | Description |
|
||||
|-----|------|---------|-------------|
|
||||
| `botToken` | `string` | *required* | Telegram Bot API token |
|
||||
| `webhookUrl` | `string` | *required* | Public webhook URL (must be HTTPS in production) |
|
||||
| `webhookSecret` | `string` | *required* | Secret token for webhook validation |
|
||||
| `webhookPort` | `number` | `2470` | Local server port |
|
||||
| `webhookHost` | `string` | `127.0.0.1` | Local server bind address |
|
||||
| `webhookPath` | `string` | `/telegram-webhook` | Webhook endpoint path |
|
||||
| `allowedChatIds` | `string[]` | `undefined` | Whitelist of allowed chat IDs (optional) |
|
||||
| `streamingThrottleMs` | `number` | `1000` | Throttle interval for streaming edits |
|
||||
| `minInitialChars` | `number` | `50` | Min chars before sending first message (debounce) |
|
||||
| `trustedProxies` | `string[]` | `["127.0.0.1"]` | Trusted proxy IPs/CIDRs for X-Forwarded-For |
|
||||
| `maxConcurrent` | `number` | `2` | Max concurrent prompts across all senders |
|
||||
| `timeoutMs` | `number` | `300000` | Prompt timeout (5 min) |
|
||||
| `idleTimeoutMinutes` | `number` | `30` | RPC session idle timeout |
|
||||
| `model` | `string` | `undefined` | Override model for RPC sessions |
|
||||
| `extensions` | `string[]` | `[]` | Additional pi extensions to load |
|
||||
|
||||
## Deployment
|
||||
|
||||
### Reverse Proxy (nginx)
|
||||
|
||||
```nginx
|
||||
location /telegram-webhook {
|
||||
proxy_pass http://127.0.0.1:2470;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_http_version 1.1;
|
||||
proxy_read_timeout 35s;
|
||||
client_max_body_size 2M;
|
||||
}
|
||||
```
|
||||
|
||||
### Systemd Service
|
||||
|
||||
```ini
|
||||
[Unit]
|
||||
Description=pi Telegram webhook
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=youruser
|
||||
WorkingDirectory=/path/to/your/project
|
||||
ExecStart=/usr/bin/pi --no-tui
|
||||
Restart=on-failure
|
||||
RestartSec=10
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
```
|
||||
|
||||
### Webhook Setup
|
||||
|
||||
The extension automatically calls `setWebhook` on startup. To manually test:
|
||||
|
||||
```bash
|
||||
curl -X POST "https://api.telegram.org/bot<YOUR_TOKEN>/setWebhook" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"url": "https://your-domain.com/telegram-webhook",
|
||||
"secret_token": "your-secret"
|
||||
}'
|
||||
```
|
||||
|
||||
## Bot Commands
|
||||
|
||||
- `/start`, `/help` — Show welcome message
|
||||
- `/new` — Start a new conversation (clear context)
|
||||
- `/abort` — Abort the current task
|
||||
- `/clear` — Clear the message queue
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
Telegram → HTTPS webhook → nginx → pi-telegram-webhook
|
||||
├─ webhook-server (security, rate limiting)
|
||||
├─ bridge (FIFO queues, concurrency)
|
||||
└─ RPC sessions (streaming replies)
|
||||
```
|
||||
|
||||
### Flow
|
||||
|
||||
1. Incoming webhook → secret validation → rate limit check → body parsing
|
||||
2. Message enqueued in per-sender FIFO queue
|
||||
3. Bridge spawns/reuses `pi --mode rpc` subprocess for sender
|
||||
4. Prompt sent to RPC session, streaming deltas received
|
||||
5. StreamingReply sends initial message, then throttled edits
|
||||
6. Generation tracking prevents race conditions
|
||||
|
||||
## Development
|
||||
|
||||
```bash
|
||||
npm install
|
||||
npm run build
|
||||
npm test
|
||||
```
|
||||
|
||||
## Security Notes
|
||||
|
||||
- **Secret validation** uses constant-time comparison (timing attack resistant)
|
||||
- **Rate limiting** applied *before* secret check (prevents brute-force)
|
||||
- **Body size limit** prevents memory exhaustion
|
||||
- **Read timeout** prevents slowloris attacks
|
||||
- **Trusted proxies** must be explicitly configured for X-Forwarded-For
|
||||
|
||||
## License
|
||||
|
||||
MIT
|
||||
|
|
|
|||
5181
package-lock.json
generated
Normal file
5181
package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load diff
38
package.json
Normal file
38
package.json
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
{
|
||||
"name": "pi-telegram-webhook",
|
||||
"version": "0.1.0",
|
||||
"description": "Webhook-based Telegram adapter for pi with streaming replies and RPC sessions",
|
||||
"type": "module",
|
||||
"main": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts",
|
||||
"pi": {
|
||||
"extensions": ["./src/index.ts"]
|
||||
},
|
||||
"files": [
|
||||
"dist",
|
||||
"README.md"
|
||||
],
|
||||
"scripts": {
|
||||
"build": "tsc",
|
||||
"test": "vitest run",
|
||||
"dev": "tsc --watch",
|
||||
"prepublishOnly": "npm run build"
|
||||
},
|
||||
"keywords": [
|
||||
"pi",
|
||||
"telegram",
|
||||
"webhook",
|
||||
"extension"
|
||||
],
|
||||
"author": "",
|
||||
"license": "MIT",
|
||||
"peerDependencies": {
|
||||
"@mariozechner/pi-coding-agent": ">=0.60.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^20.0.0",
|
||||
"typescript": "^5.0.0",
|
||||
"vitest": "^2.0.0",
|
||||
"@mariozechner/pi-coding-agent": "^0.64.0"
|
||||
}
|
||||
}
|
||||
415
src/bridge.ts
Normal file
415
src/bridge.ts
Normal file
|
|
@ -0,0 +1,415 @@
|
|||
/**
|
||||
* pi-telegram-webhook — Bridge: incoming messages → RPC sessions → streaming replies.
|
||||
*/
|
||||
|
||||
import { spawn, type ChildProcess } from "node:child_process";
|
||||
import * as readline from "node:readline";
|
||||
import type { TelegramMessage, SenderSession, QueuedMessage, RunResult } from "./types.js";
|
||||
import type { TelegramAPI } from "./telegram-api.js";
|
||||
import { StreamingReply } from "./streaming-reply.js";
|
||||
|
||||
let msgIdCounter = 0;
|
||||
function nextMsgId(): string {
|
||||
return `msg-${Date.now()}-${++msgIdCounter}`;
|
||||
}
|
||||
|
||||
interface RpcSession {
|
||||
child: ChildProcess;
|
||||
rl: readline.Interface;
|
||||
ready: boolean;
|
||||
generation: number;
|
||||
}
|
||||
|
||||
export class Bridge {
|
||||
private api: TelegramAPI;
|
||||
private cwd: string;
|
||||
private sessions = new Map<string, SenderSession>();
|
||||
private rpcSessions = new Map<string, RpcSession>();
|
||||
private activeCount = 0;
|
||||
private running = false;
|
||||
private globalGeneration = 0;
|
||||
|
||||
constructor(
|
||||
private config: {
|
||||
allowedChatIds?: string[];
|
||||
maxConcurrent: number;
|
||||
timeoutMs: number;
|
||||
idleTimeoutMinutes: number;
|
||||
streamingThrottleMs: number;
|
||||
minInitialChars: number;
|
||||
model?: string;
|
||||
extensions?: string[];
|
||||
},
|
||||
api: TelegramAPI,
|
||||
cwd: string,
|
||||
private log: (msg: string) => void
|
||||
) {
|
||||
this.api = api;
|
||||
this.cwd = cwd;
|
||||
}
|
||||
|
||||
start(): void {
|
||||
this.running = true;
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
this.running = false;
|
||||
for (const session of this.sessions.values()) {
|
||||
session.abortController?.abort();
|
||||
}
|
||||
this.sessions.clear();
|
||||
for (const [chatId, rpc] of this.rpcSessions) {
|
||||
this.killRpc(chatId);
|
||||
}
|
||||
this.rpcSessions.clear();
|
||||
}
|
||||
|
||||
handleMessage(msg: TelegramMessage): void {
|
||||
if (!this.running) return;
|
||||
|
||||
const chatId = String(msg.chat.id);
|
||||
const text = msg.text?.trim();
|
||||
if (!text) return;
|
||||
|
||||
// Check allowed chats
|
||||
if (this.config.allowedChatIds && !this.config.allowedChatIds.includes(chatId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get or create session
|
||||
let session = this.sessions.get(chatId);
|
||||
if (!session) {
|
||||
session = {
|
||||
chatId,
|
||||
queue: [],
|
||||
processing: false,
|
||||
abortController: null,
|
||||
messageCount: 0,
|
||||
startedAt: Date.now(),
|
||||
};
|
||||
this.sessions.set(chatId, session);
|
||||
}
|
||||
|
||||
// Bot commands
|
||||
if (text.startsWith("/")) {
|
||||
const reply = this.handleCommand(text, chatId);
|
||||
if (reply) {
|
||||
void this.api.sendMessage(chatId, reply);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Queue depth check
|
||||
const maxQueue = 5;
|
||||
if (session.queue.length >= maxQueue) {
|
||||
void this.api.sendMessage(
|
||||
chatId,
|
||||
`⚠️ Queue full (${maxQueue} pending). Use /abort to cancel current task.`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Enqueue
|
||||
const queued: QueuedMessage = {
|
||||
id: nextMsgId(),
|
||||
chatId,
|
||||
text,
|
||||
enqueuedAt: Date.now(),
|
||||
};
|
||||
session.queue.push(queued);
|
||||
session.messageCount++;
|
||||
|
||||
this.processNext(chatId);
|
||||
}
|
||||
|
||||
private async processNext(chatId: string): Promise<void> {
|
||||
const session = this.sessions.get(chatId);
|
||||
if (!session || session.processing || session.queue.length === 0) return;
|
||||
if (this.activeCount >= this.config.maxConcurrent) return;
|
||||
|
||||
session.processing = true;
|
||||
this.activeCount++;
|
||||
const msg = session.queue.shift()!;
|
||||
|
||||
// Typing indicator
|
||||
void this.api.sendChatAction(chatId, "typing");
|
||||
|
||||
const ac = new AbortController();
|
||||
session.abortController = ac;
|
||||
|
||||
try {
|
||||
const result = await this.runWithRpc(chatId, msg.text, ac.signal);
|
||||
|
||||
if (result.ok) {
|
||||
// Response already streamed
|
||||
} else if (result.error === "Aborted") {
|
||||
void this.api.sendMessage(chatId, "⏹ Aborted.");
|
||||
} else {
|
||||
void this.api.sendMessage(chatId, result.response || `❌ ${result.error || "Error"}`);
|
||||
}
|
||||
} catch (err: unknown) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
this.log(`Bridge error: ${message}`);
|
||||
void this.api.sendMessage(chatId, `❌ Unexpected error: ${message}`);
|
||||
} finally {
|
||||
session.abortController = null;
|
||||
session.processing = false;
|
||||
this.activeCount--;
|
||||
|
||||
if (session.queue.length > 0) this.processNext(chatId);
|
||||
this.drainWaiting();
|
||||
}
|
||||
}
|
||||
|
||||
private drainWaiting(): void {
|
||||
if (this.activeCount >= this.config.maxConcurrent) return;
|
||||
for (const [chatId, session] of this.sessions) {
|
||||
if (!session.processing && session.queue.length > 0) {
|
||||
this.processNext(chatId);
|
||||
if (this.activeCount >= this.config.maxConcurrent) break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async runWithRpc(chatId: string, prompt: string, signal?: AbortSignal): Promise<RunResult> {
|
||||
const rpc = await this.getOrCreateRpc(chatId);
|
||||
if (!rpc) {
|
||||
return {
|
||||
ok: false,
|
||||
response: "",
|
||||
error: "Failed to start session",
|
||||
durationMs: 0,
|
||||
exitCode: 1,
|
||||
};
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const startTime = Date.now();
|
||||
const generation = ++this.globalGeneration;
|
||||
const streaming = new StreamingReply(this.api, chatId, generation, {
|
||||
throttleMs: this.config.streamingThrottleMs,
|
||||
minInitialChars: this.config.minInitialChars,
|
||||
});
|
||||
|
||||
let responded = false;
|
||||
const finish = (result: RunResult) => {
|
||||
if (responded) return;
|
||||
responded = true;
|
||||
void streaming.stop();
|
||||
resolve(result);
|
||||
};
|
||||
|
||||
// Timeout
|
||||
const timer = setTimeout(() => {
|
||||
finish({
|
||||
ok: false,
|
||||
response: streaming.getMessageId() ? "" : "(timed out)",
|
||||
error: "Timeout",
|
||||
durationMs: Date.now() - startTime,
|
||||
exitCode: 124,
|
||||
});
|
||||
this.killRpc(chatId);
|
||||
}, this.config.timeoutMs);
|
||||
|
||||
// Abort
|
||||
const onAbort = () => {
|
||||
clearTimeout(timer);
|
||||
this.sendRpcCommand(rpc, { type: "abort" });
|
||||
finish({
|
||||
ok: false,
|
||||
response: "",
|
||||
error: "Aborted",
|
||||
durationMs: Date.now() - startTime,
|
||||
exitCode: 130,
|
||||
});
|
||||
};
|
||||
|
||||
if (signal?.aborted) {
|
||||
clearTimeout(timer);
|
||||
finish({
|
||||
ok: false,
|
||||
response: "",
|
||||
error: "Aborted",
|
||||
durationMs: Date.now() - startTime,
|
||||
exitCode: 130,
|
||||
});
|
||||
return;
|
||||
}
|
||||
signal?.addEventListener("abort", onAbort, { once: true });
|
||||
|
||||
// RPC event handlers
|
||||
const originalLine = rpc.rl.listenerCount("line") > 0;
|
||||
const lineHandler = (line: string) => {
|
||||
if (streaming.getGeneration() !== generation) return; // Old generation
|
||||
|
||||
let event: Record<string, unknown>;
|
||||
try {
|
||||
event = JSON.parse(line);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
|
||||
const type = event.type as string;
|
||||
|
||||
// Streaming deltas
|
||||
if (type === "message_update") {
|
||||
const delta = event.assistantMessageEvent as Record<string, unknown> | undefined;
|
||||
if (delta?.type === "text_delta" && typeof delta.delta === "string") {
|
||||
const current = streaming.getMessageId() ? "" : streaming["pending"] || "";
|
||||
streaming.update(current + delta.delta);
|
||||
}
|
||||
|
||||
// Error
|
||||
if (delta?.type === "done" && delta.reason === "error") {
|
||||
clearTimeout(timer);
|
||||
signal?.removeEventListener("abort", onAbort);
|
||||
finish({
|
||||
ok: false,
|
||||
response: "",
|
||||
error: "Agent error",
|
||||
durationMs: Date.now() - startTime,
|
||||
exitCode: 1,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Agent finished
|
||||
if (type === "agent_end") {
|
||||
clearTimeout(timer);
|
||||
signal?.removeEventListener("abort", onAbort);
|
||||
finish({
|
||||
ok: true,
|
||||
response: "",
|
||||
durationMs: Date.now() - startTime,
|
||||
exitCode: 0,
|
||||
});
|
||||
}
|
||||
|
||||
// Response errors
|
||||
if (type === "response") {
|
||||
const success = event.success as boolean;
|
||||
if (!success) {
|
||||
clearTimeout(timer);
|
||||
signal?.removeEventListener("abort", onAbort);
|
||||
finish({
|
||||
ok: false,
|
||||
response: "",
|
||||
error: (event.error as string) || "RPC command failed",
|
||||
durationMs: Date.now() - startTime,
|
||||
exitCode: 1,
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (!originalLine) {
|
||||
rpc.rl.on("line", lineHandler);
|
||||
}
|
||||
|
||||
// Send prompt
|
||||
this.sendRpcCommand(rpc, {
|
||||
type: "prompt",
|
||||
message: prompt,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private async getOrCreateRpc(chatId: string): Promise<RpcSession | null> {
|
||||
let rpc = this.rpcSessions.get(chatId);
|
||||
if (rpc?.ready) return rpc;
|
||||
|
||||
// Clean up dead session
|
||||
if (rpc) this.killRpc(chatId);
|
||||
|
||||
// Spawn new
|
||||
const args = ["--mode", "rpc", "--no-extensions"];
|
||||
if (this.config.model) args.push("--model", this.config.model);
|
||||
if (this.config.extensions?.length) {
|
||||
for (const ext of this.config.extensions) {
|
||||
args.push("-e", ext);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const child = spawn("pi", args, {
|
||||
cwd: this.cwd,
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
env: { ...process.env },
|
||||
});
|
||||
|
||||
if (!child.stdout || !child.stdin) return null;
|
||||
|
||||
const rl = readline.createInterface({ input: child.stdout });
|
||||
|
||||
rpc = { child, rl, ready: true, generation: 0 };
|
||||
this.rpcSessions.set(chatId, rpc);
|
||||
|
||||
child.on("close", () => {
|
||||
this.rpcSessions.delete(chatId);
|
||||
});
|
||||
|
||||
child.on("error", () => {
|
||||
this.killRpc(chatId);
|
||||
});
|
||||
|
||||
return rpc;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private sendRpcCommand(rpc: RpcSession, cmd: Record<string, unknown>): void {
|
||||
if (!rpc.child.stdin?.writable) return;
|
||||
rpc.child.stdin.write(JSON.stringify(cmd) + "\n");
|
||||
}
|
||||
|
||||
private killRpc(chatId: string): void {
|
||||
const rpc = this.rpcSessions.get(chatId);
|
||||
if (!rpc) return;
|
||||
|
||||
rpc.rl.close();
|
||||
rpc.child.kill("SIGTERM");
|
||||
setTimeout(() => {
|
||||
if (!rpc.child.killed) rpc.child.kill("SIGKILL");
|
||||
}, 3000);
|
||||
|
||||
this.rpcSessions.delete(chatId);
|
||||
}
|
||||
|
||||
private handleCommand(cmd: string, chatId: string): string | null {
|
||||
const lower = cmd.toLowerCase();
|
||||
|
||||
if (lower === "/start" || lower === "/help") {
|
||||
return "👋 Send me a message and I'll help you with coding tasks.";
|
||||
}
|
||||
|
||||
if (lower === "/new") {
|
||||
const rpc = this.rpcSessions.get(chatId);
|
||||
if (rpc) {
|
||||
this.sendRpcCommand(rpc, { type: "new_session" });
|
||||
return "✨ Started a new conversation.";
|
||||
}
|
||||
return "No active session.";
|
||||
}
|
||||
|
||||
if (lower === "/abort") {
|
||||
const session = this.sessions.get(chatId);
|
||||
if (session?.abortController) {
|
||||
session.abortController.abort();
|
||||
return "⏹ Aborted current task.";
|
||||
}
|
||||
return "No active task.";
|
||||
}
|
||||
|
||||
if (lower === "/clear") {
|
||||
const session = this.sessions.get(chatId);
|
||||
if (session) {
|
||||
session.queue.length = 0;
|
||||
return "🗑 Cleared queue.";
|
||||
}
|
||||
return "Queue is empty.";
|
||||
}
|
||||
|
||||
return null; // Unknown command
|
||||
}
|
||||
}
|
||||
34
src/config.ts
Normal file
34
src/config.ts
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* pi-telegram-webhook — Config loader.
|
||||
*/
|
||||
|
||||
import * as fs from "node:fs";
|
||||
import * as path from "node:path";
|
||||
import type { Config } from "./types.js";
|
||||
|
||||
const DEFAULTS: Partial<Config> = {
|
||||
webhookPort: 2470,
|
||||
webhookHost: "127.0.0.1",
|
||||
webhookPath: "/telegram-webhook",
|
||||
streamingThrottleMs: 1000,
|
||||
minInitialChars: 50,
|
||||
trustedProxies: ["127.0.0.1"],
|
||||
maxConcurrent: 2,
|
||||
timeoutMs: 300000,
|
||||
idleTimeoutMinutes: 30,
|
||||
};
|
||||
|
||||
export function loadConfig(cwd: string): Config | null {
|
||||
const settingsPath = path.join(cwd, ".pi", "settings.json");
|
||||
if (!fs.existsSync(settingsPath)) return null;
|
||||
|
||||
try {
|
||||
const settings = JSON.parse(fs.readFileSync(settingsPath, "utf-8"));
|
||||
const cfg = settings["pi-telegram-webhook"];
|
||||
if (!cfg) return null;
|
||||
|
||||
return { ...DEFAULTS, ...cfg } as Config;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
97
src/index.ts
Normal file
97
src/index.ts
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* pi-telegram-webhook — Webhook-based Telegram adapter for pi.
|
||||
*
|
||||
* Entry point for pi extension.
|
||||
*/
|
||||
|
||||
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
|
||||
import { loadConfig } from "./config.js";
|
||||
import { TelegramAPI } from "./telegram-api.js";
|
||||
import { startWebhookServer } from "./webhook-server.js";
|
||||
import { Bridge } from "./bridge.js";
|
||||
import type { TelegramUpdate } from "./types.js";
|
||||
|
||||
export default function (pi: ExtensionAPI) {
|
||||
let cleanup: (() => void) | null = null;
|
||||
|
||||
pi.on("session_start", async (_event, ctx) => {
|
||||
const config = loadConfig(ctx.cwd);
|
||||
if (!config) {
|
||||
ctx.ui.notify("pi-telegram-webhook: No config found in .pi/settings.json", "warning");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!config.botToken || !config.webhookSecret || !config.webhookUrl) {
|
||||
ctx.ui.notify(
|
||||
"pi-telegram-webhook: Missing required config (botToken, webhookSecret, webhookUrl)",
|
||||
"warning"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const api = new TelegramAPI(config.botToken);
|
||||
|
||||
// Verify bot token
|
||||
try {
|
||||
await api.getMe();
|
||||
} catch (err) {
|
||||
ctx.ui.notify(`pi-telegram-webhook: Invalid bot token: ${err}`, "error");
|
||||
return;
|
||||
}
|
||||
|
||||
// Set webhook
|
||||
try {
|
||||
await api.setWebhook(config.webhookUrl, config.webhookSecret);
|
||||
} catch (err) {
|
||||
ctx.ui.notify(`pi-telegram-webhook: Failed to set webhook: ${err}`, "error");
|
||||
return;
|
||||
}
|
||||
|
||||
// Create bridge
|
||||
const bridge = new Bridge(
|
||||
{
|
||||
allowedChatIds: config.allowedChatIds,
|
||||
maxConcurrent: config.maxConcurrent ?? 2,
|
||||
timeoutMs: config.timeoutMs ?? 300000,
|
||||
idleTimeoutMinutes: config.idleTimeoutMinutes ?? 30,
|
||||
streamingThrottleMs: config.streamingThrottleMs ?? 1000,
|
||||
minInitialChars: config.minInitialChars ?? 50,
|
||||
model: config.model,
|
||||
extensions: config.extensions,
|
||||
},
|
||||
api,
|
||||
ctx.cwd,
|
||||
(msg: string) => console.log(`[pi-telegram-webhook] ${msg}`)
|
||||
);
|
||||
|
||||
bridge.start();
|
||||
|
||||
// Start webhook server
|
||||
const { stop } = startWebhookServer({
|
||||
port: config.webhookPort ?? 2470,
|
||||
host: config.webhookHost ?? "127.0.0.1",
|
||||
path: config.webhookPath ?? "/telegram-webhook",
|
||||
secret: config.webhookSecret,
|
||||
trustedProxies: config.trustedProxies ?? ["127.0.0.1"],
|
||||
onUpdate: (update: TelegramUpdate) => {
|
||||
if (update.message) {
|
||||
bridge.handleMessage(update.message);
|
||||
}
|
||||
},
|
||||
log: (msg: string) => console.log(`[pi-telegram-webhook] ${msg}`),
|
||||
});
|
||||
|
||||
cleanup = () => {
|
||||
bridge.stop();
|
||||
stop();
|
||||
void api.deleteWebhook().catch(() => {});
|
||||
};
|
||||
|
||||
ctx.ui.notify("pi-telegram-webhook: Started", "info");
|
||||
});
|
||||
|
||||
pi.on("session_shutdown", async () => {
|
||||
cleanup?.();
|
||||
cleanup = null;
|
||||
});
|
||||
}
|
||||
108
src/streaming-reply.ts
Normal file
108
src/streaming-reply.ts
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* pi-telegram-webhook — Streaming reply manager.
|
||||
*
|
||||
* Sends initial message on first chunk, then throttled edits.
|
||||
* Tracks generations to prevent new messages from clobbering old edits.
|
||||
*/
|
||||
|
||||
import type { TelegramAPI } from "./telegram-api.js";
|
||||
|
||||
const MAX_TELEGRAM_LENGTH = 4096;
|
||||
|
||||
export class StreamingReply {
|
||||
private api: TelegramAPI;
|
||||
private chatId: string;
|
||||
private throttleMs: number;
|
||||
private minInitialChars: number;
|
||||
private messageId: number | null = null;
|
||||
private lastSent = "";
|
||||
private pending = "";
|
||||
private generation: number;
|
||||
private flushTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private lastFlushTime = 0;
|
||||
private stopped = false;
|
||||
|
||||
constructor(
|
||||
api: TelegramAPI,
|
||||
chatId: string,
|
||||
generation: number,
|
||||
opts: { throttleMs?: number; minInitialChars?: number }
|
||||
) {
|
||||
this.api = api;
|
||||
this.chatId = chatId;
|
||||
this.generation = generation;
|
||||
this.throttleMs = opts.throttleMs ?? 1000;
|
||||
this.minInitialChars = opts.minInitialChars ?? 50;
|
||||
}
|
||||
|
||||
update(text: string): void {
|
||||
if (this.stopped) return;
|
||||
this.pending = text;
|
||||
this.scheduleFlush();
|
||||
}
|
||||
|
||||
async flush(): Promise<void> {
|
||||
if (this.stopped) return;
|
||||
if (this.flushTimer) {
|
||||
clearTimeout(this.flushTimer);
|
||||
this.flushTimer = null;
|
||||
}
|
||||
await this.doFlush();
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this.stopped = true;
|
||||
await this.flush();
|
||||
}
|
||||
|
||||
getMessageId(): number | null {
|
||||
return this.messageId;
|
||||
}
|
||||
|
||||
getGeneration(): number {
|
||||
return this.generation;
|
||||
}
|
||||
|
||||
private scheduleFlush(): void {
|
||||
if (this.flushTimer) return;
|
||||
|
||||
const now = Date.now();
|
||||
const elapsed = now - this.lastFlushTime;
|
||||
const delay = Math.max(0, this.throttleMs - elapsed);
|
||||
|
||||
this.flushTimer = setTimeout(() => {
|
||||
this.flushTimer = null;
|
||||
void this.doFlush();
|
||||
}, delay);
|
||||
}
|
||||
|
||||
private async doFlush(): Promise<void> {
|
||||
if (this.stopped && this.pending === this.lastSent) return;
|
||||
|
||||
const text = this.pending.trimEnd();
|
||||
if (!text || text === this.lastSent) return;
|
||||
|
||||
// Truncate if too long
|
||||
const truncated = text.length > MAX_TELEGRAM_LENGTH ? text.slice(0, MAX_TELEGRAM_LENGTH) : text;
|
||||
|
||||
try {
|
||||
if (this.messageId === null) {
|
||||
// Debounce first send for better push notification quality
|
||||
if (!this.stopped && truncated.length < this.minInitialChars) return;
|
||||
|
||||
const result = await this.api.sendMessage(this.chatId, truncated);
|
||||
this.messageId = result.message_id;
|
||||
} else {
|
||||
await this.api.editMessageText(this.chatId, this.messageId, truncated);
|
||||
}
|
||||
|
||||
this.lastSent = truncated;
|
||||
this.lastFlushTime = Date.now();
|
||||
} catch (err) {
|
||||
// Silently fail edits (message may have been deleted)
|
||||
if (this.messageId !== null) {
|
||||
this.stopped = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
97
src/telegram-api.ts
Normal file
97
src/telegram-api.ts
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* pi-telegram-webhook — Thin Telegram Bot API wrapper.
|
||||
*/
|
||||
|
||||
const BASE_URL = "https://api.telegram.org/bot";
|
||||
|
||||
export class TelegramAPI {
|
||||
private token: string;
|
||||
|
||||
constructor(token: string) {
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
private url(method: string): string {
|
||||
return `${BASE_URL}${this.token}/${method}`;
|
||||
}
|
||||
|
||||
private async request(method: string, body?: Record<string, unknown>): Promise<unknown> {
|
||||
const res = await fetch(this.url(method), {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: body ? JSON.stringify(body) : undefined,
|
||||
});
|
||||
|
||||
if (!res.ok) {
|
||||
const text = await res.text().catch(() => "unknown");
|
||||
throw new Error(`Telegram API ${method} failed (${res.status}): ${text}`);
|
||||
}
|
||||
|
||||
const data = await res.json();
|
||||
if (!(data as { ok: boolean }).ok) {
|
||||
throw new Error(`Telegram API ${method} failed: ${JSON.stringify(data)}`);
|
||||
}
|
||||
|
||||
return (data as { result: unknown }).result;
|
||||
}
|
||||
|
||||
async getMe(): Promise<unknown> {
|
||||
return this.request("getMe");
|
||||
}
|
||||
|
||||
async sendMessage(
|
||||
chatId: string | number,
|
||||
text: string,
|
||||
options?: { parse_mode?: string; reply_to_message_id?: number }
|
||||
): Promise<{ message_id: number }> {
|
||||
const body: Record<string, unknown> = { chat_id: chatId, text };
|
||||
if (options?.parse_mode) body.parse_mode = options.parse_mode;
|
||||
if (options?.reply_to_message_id) body.reply_to_message_id = options.reply_to_message_id;
|
||||
|
||||
return (await this.request("sendMessage", body)) as { message_id: number };
|
||||
}
|
||||
|
||||
async editMessageText(
|
||||
chatId: string | number,
|
||||
messageId: number,
|
||||
text: string,
|
||||
options?: { parse_mode?: string }
|
||||
): Promise<void> {
|
||||
const body: Record<string, unknown> = {
|
||||
chat_id: chatId,
|
||||
message_id: messageId,
|
||||
text,
|
||||
};
|
||||
if (options?.parse_mode) body.parse_mode = options.parse_mode;
|
||||
|
||||
await this.request("editMessageText", body);
|
||||
}
|
||||
|
||||
async deleteMessage(chatId: string | number, messageId: number): Promise<void> {
|
||||
await this.request("deleteMessage", {
|
||||
chat_id: chatId,
|
||||
message_id: messageId,
|
||||
});
|
||||
}
|
||||
|
||||
async sendChatAction(chatId: string | number, action = "typing"): Promise<void> {
|
||||
await this.request("sendChatAction", {
|
||||
chat_id: chatId,
|
||||
action,
|
||||
}).catch(() => {
|
||||
// Best-effort
|
||||
});
|
||||
}
|
||||
|
||||
async setWebhook(url: string, secret: string): Promise<void> {
|
||||
await this.request("setWebhook", {
|
||||
url,
|
||||
secret_token: secret,
|
||||
allowed_updates: ["message"],
|
||||
});
|
||||
}
|
||||
|
||||
async deleteWebhook(): Promise<void> {
|
||||
await this.request("deleteWebhook", { drop_pending_updates: false });
|
||||
}
|
||||
}
|
||||
66
src/types.ts
Normal file
66
src/types.ts
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* pi-telegram-webhook — Type definitions.
|
||||
*/
|
||||
|
||||
export interface Config {
|
||||
botToken: string;
|
||||
webhookUrl: string;
|
||||
webhookPort?: number;
|
||||
webhookHost?: string;
|
||||
webhookPath?: string;
|
||||
webhookSecret: string;
|
||||
allowedChatIds?: string[];
|
||||
streamingThrottleMs?: number;
|
||||
minInitialChars?: number;
|
||||
trustedProxies?: string[];
|
||||
maxConcurrent?: number;
|
||||
timeoutMs?: number;
|
||||
idleTimeoutMinutes?: number;
|
||||
model?: string;
|
||||
extensions?: string[];
|
||||
}
|
||||
|
||||
export interface TelegramMessage {
|
||||
message_id: number;
|
||||
from?: {
|
||||
id: number;
|
||||
username?: string;
|
||||
first_name?: string;
|
||||
};
|
||||
chat: {
|
||||
id: number;
|
||||
type: string;
|
||||
title?: string;
|
||||
};
|
||||
date: number;
|
||||
text?: string;
|
||||
}
|
||||
|
||||
export interface TelegramUpdate {
|
||||
update_id: number;
|
||||
message?: TelegramMessage;
|
||||
}
|
||||
|
||||
export interface SenderSession {
|
||||
chatId: string;
|
||||
queue: QueuedMessage[];
|
||||
processing: boolean;
|
||||
abortController: AbortController | null;
|
||||
messageCount: number;
|
||||
startedAt: number;
|
||||
}
|
||||
|
||||
export interface QueuedMessage {
|
||||
id: string;
|
||||
chatId: string;
|
||||
text: string;
|
||||
enqueuedAt: number;
|
||||
}
|
||||
|
||||
export interface RunResult {
|
||||
ok: boolean;
|
||||
response: string;
|
||||
error?: string;
|
||||
durationMs: number;
|
||||
exitCode: number;
|
||||
}
|
||||
287
src/webhook-server.ts
Normal file
287
src/webhook-server.ts
Normal file
|
|
@ -0,0 +1,287 @@
|
|||
/**
|
||||
* pi-telegram-webhook — HTTP webhook server.
|
||||
*
|
||||
* Security:
|
||||
* - Constant-time secret validation
|
||||
* - Fixed-window rate limiter per IP
|
||||
* - Body size limit (1MB)
|
||||
* - Read timeout (30s)
|
||||
* - Trusted proxy X-Forwarded-For support
|
||||
*/
|
||||
|
||||
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
|
||||
import * as net from "node:net";
|
||||
import { timingSafeEqual } from "node:crypto";
|
||||
import type { TelegramUpdate } from "./types.js";
|
||||
|
||||
const MAX_BODY_BYTES = 1_048_576; // 1MB
|
||||
const READ_TIMEOUT_MS = 30_000;
|
||||
const RATE_LIMIT_WINDOW_MS = 60_000;
|
||||
const RATE_LIMIT_MAX_REQUESTS = 60;
|
||||
const RATE_LIMIT_MAX_KEYS = 1000;
|
||||
|
||||
interface RateLimiter {
|
||||
check(key: string): boolean;
|
||||
}
|
||||
|
||||
function createFixedWindowRateLimiter(opts: {
|
||||
windowMs: number;
|
||||
maxRequests: number;
|
||||
maxTrackedKeys: number;
|
||||
}): RateLimiter {
|
||||
const windows = new Map<string, { count: number; resetAt: number }>();
|
||||
|
||||
return {
|
||||
check(key: string): boolean {
|
||||
const now = Date.now();
|
||||
const entry = windows.get(key);
|
||||
|
||||
if (entry && now < entry.resetAt) {
|
||||
if (entry.count >= opts.maxRequests) return false;
|
||||
entry.count++;
|
||||
return true;
|
||||
}
|
||||
|
||||
// New window
|
||||
if (windows.size >= opts.maxTrackedKeys) {
|
||||
// Evict expired entries
|
||||
for (const [k, v] of windows) {
|
||||
if (now >= v.resetAt) windows.delete(k);
|
||||
}
|
||||
// If still full, reject
|
||||
if (windows.size >= opts.maxTrackedKeys) return false;
|
||||
}
|
||||
|
||||
windows.set(key, { count: 1, resetAt: now + opts.windowMs });
|
||||
return true;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/** Constant-time string comparison (prevents timing attacks). */
|
||||
function constantTimeEqual(a: string | undefined, b: string): boolean {
|
||||
if (!a) return false;
|
||||
const aLen = Buffer.byteLength(a);
|
||||
const bLen = Buffer.byteLength(b);
|
||||
const bufA = Buffer.allocUnsafe(Math.max(aLen, bLen)).fill(0);
|
||||
const bufB = Buffer.allocUnsafe(Math.max(aLen, bLen)).fill(0);
|
||||
bufA.write(a);
|
||||
bufB.write(b);
|
||||
// Constant-time compare
|
||||
return timingSafeEqual(bufA, bufB) && aLen === bLen;
|
||||
}
|
||||
|
||||
async function readJsonBody(
|
||||
req: IncomingMessage,
|
||||
maxBytes: number,
|
||||
timeoutMs: number
|
||||
): Promise<{ ok: true; data: unknown } | { ok: false; error: string; code: number }> {
|
||||
return new Promise((resolve) => {
|
||||
const chunks: Buffer[] = [];
|
||||
let totalSize = 0;
|
||||
let timedOut = false;
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
timedOut = true;
|
||||
req.destroy();
|
||||
resolve({ ok: false, error: "Request timeout", code: 408 });
|
||||
}, timeoutMs);
|
||||
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
if (timedOut) return;
|
||||
totalSize += chunk.length;
|
||||
if (totalSize > maxBytes) {
|
||||
clearTimeout(timer);
|
||||
req.destroy();
|
||||
resolve({ ok: false, error: "Payload too large", code: 413 });
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
req.on("end", () => {
|
||||
if (timedOut) return;
|
||||
clearTimeout(timer);
|
||||
try {
|
||||
const body = Buffer.concat(chunks).toString("utf-8");
|
||||
const data = JSON.parse(body);
|
||||
resolve({ ok: true, data });
|
||||
} catch {
|
||||
resolve({ ok: false, error: "Invalid JSON", code: 400 });
|
||||
}
|
||||
});
|
||||
|
||||
req.on("error", () => {
|
||||
if (timedOut) return;
|
||||
clearTimeout(timer);
|
||||
resolve({ ok: false, error: "Connection error", code: 400 });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function parseIpLiteral(value: string | undefined): string | undefined {
|
||||
const trimmed = value?.trim();
|
||||
if (!trimmed) return undefined;
|
||||
|
||||
if (trimmed.startsWith("[")) {
|
||||
const end = trimmed.indexOf("]");
|
||||
if (end !== -1) {
|
||||
const candidate = trimmed.slice(1, end);
|
||||
return net.isIP(candidate) === 0 ? undefined : candidate;
|
||||
}
|
||||
}
|
||||
|
||||
if (net.isIP(trimmed) !== 0) return trimmed;
|
||||
|
||||
const lastColon = trimmed.lastIndexOf(":");
|
||||
if (lastColon > -1 && trimmed.includes(".") && trimmed.indexOf(":") === lastColon) {
|
||||
const candidate = trimmed.slice(0, lastColon);
|
||||
return net.isIP(candidate) === 4 ? candidate : undefined;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function isTrustedProxy(ip: string | undefined, trustedProxies: string[]): boolean {
|
||||
if (!ip || !trustedProxies.length) return false;
|
||||
const parsed = parseIpLiteral(ip);
|
||||
if (!parsed) return false;
|
||||
|
||||
const blockList = new net.BlockList();
|
||||
for (const proxy of trustedProxies) {
|
||||
const trimmed = proxy.trim();
|
||||
if (!trimmed) continue;
|
||||
|
||||
if (trimmed.includes("/")) {
|
||||
const [address, prefix] = trimmed.split("/", 2);
|
||||
const parsedPrefix = Number.parseInt(prefix ?? "", 10);
|
||||
const family = net.isIP(address);
|
||||
if (family === 4 && Number.isInteger(parsedPrefix) && parsedPrefix >= 0 && parsedPrefix <= 32) {
|
||||
blockList.addSubnet(address, parsedPrefix, "ipv4");
|
||||
}
|
||||
if (family === 6 && Number.isInteger(parsedPrefix) && parsedPrefix >= 0 && parsedPrefix <= 128) {
|
||||
blockList.addSubnet(address, parsedPrefix, "ipv6");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (net.isIP(trimmed) === 4) {
|
||||
blockList.addAddress(trimmed, "ipv4");
|
||||
continue;
|
||||
}
|
||||
if (net.isIP(trimmed) === 6) {
|
||||
blockList.addAddress(trimmed, "ipv6");
|
||||
}
|
||||
}
|
||||
|
||||
return blockList.check(parsed, net.isIP(parsed) === 6 ? "ipv6" : "ipv4");
|
||||
}
|
||||
|
||||
function resolveClientIp(req: IncomingMessage, trustedProxies: string[]): string {
|
||||
const remoteAddress = parseIpLiteral(req.socket.remoteAddress);
|
||||
if (!remoteAddress) return "unknown";
|
||||
|
||||
if (!isTrustedProxy(remoteAddress, trustedProxies)) return remoteAddress;
|
||||
|
||||
const forwardedFor = Array.isArray(req.headers["x-forwarded-for"])
|
||||
? req.headers["x-forwarded-for"][0]
|
||||
: req.headers["x-forwarded-for"];
|
||||
|
||||
if (forwardedFor) {
|
||||
const chain = forwardedFor
|
||||
.split(",")
|
||||
.map((entry) => parseIpLiteral(entry))
|
||||
.filter((entry): entry is string => Boolean(entry));
|
||||
|
||||
for (let i = chain.length - 1; i >= 0; i--) {
|
||||
if (!isTrustedProxy(chain[i], trustedProxies)) return chain[i]!;
|
||||
}
|
||||
}
|
||||
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
export function startWebhookServer(opts: {
|
||||
port: number;
|
||||
host: string;
|
||||
path: string;
|
||||
secret: string;
|
||||
trustedProxies: string[];
|
||||
onUpdate: (update: TelegramUpdate) => void;
|
||||
log?: (msg: string) => void;
|
||||
}): { server: ReturnType<typeof createServer>; stop: () => void } {
|
||||
const rateLimiter = createFixedWindowRateLimiter({
|
||||
windowMs: RATE_LIMIT_WINDOW_MS,
|
||||
maxRequests: RATE_LIMIT_MAX_REQUESTS,
|
||||
maxTrackedKeys: RATE_LIMIT_MAX_KEYS,
|
||||
});
|
||||
|
||||
const server = createServer((req, res) => {
|
||||
const respond = (code: number, text = "") => {
|
||||
if (res.headersSent || res.writableEnded) return;
|
||||
res.writeHead(code, { "Content-Type": "text/plain; charset=utf-8" });
|
||||
res.end(text);
|
||||
};
|
||||
|
||||
// Health check
|
||||
if (req.url === "/healthz") {
|
||||
res.writeHead(200);
|
||||
res.end("ok");
|
||||
return;
|
||||
}
|
||||
|
||||
// Only POST to webhook path
|
||||
if (req.url !== opts.path || req.method !== "POST") {
|
||||
res.writeHead(404);
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
// Rate limit
|
||||
const clientIp = resolveClientIp(req, opts.trustedProxies);
|
||||
const rateLimitKey = `${opts.path}:${clientIp}`;
|
||||
if (!rateLimiter.check(rateLimitKey)) {
|
||||
respond(429, "Rate limit exceeded");
|
||||
return;
|
||||
}
|
||||
|
||||
// Validate secret (constant-time)
|
||||
const secretHeader = Array.isArray(req.headers["x-telegram-bot-api-secret-token"])
|
||||
? req.headers["x-telegram-bot-api-secret-token"][0]
|
||||
: req.headers["x-telegram-bot-api-secret-token"];
|
||||
|
||||
if (!constantTimeEqual(secretHeader, opts.secret)) {
|
||||
res.shouldKeepAlive = false;
|
||||
res.setHeader("Connection", "close");
|
||||
respond(401, "Unauthorized");
|
||||
return;
|
||||
}
|
||||
|
||||
// Read body
|
||||
void (async () => {
|
||||
const body = await readJsonBody(req, MAX_BODY_BYTES, READ_TIMEOUT_MS);
|
||||
if (!body.ok) {
|
||||
respond(body.code, body.error);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
opts.onUpdate(body.data as TelegramUpdate);
|
||||
respond(200, "ok");
|
||||
} catch (err) {
|
||||
opts.log?.(`Webhook handler error: ${err}`);
|
||||
respond(500);
|
||||
}
|
||||
})();
|
||||
});
|
||||
|
||||
server.listen(opts.port, opts.host, () => {
|
||||
opts.log?.(`Webhook listening on http://${opts.host}:${opts.port}${opts.path}`);
|
||||
});
|
||||
|
||||
const stop = () => {
|
||||
server.close();
|
||||
};
|
||||
|
||||
return { server, stop };
|
||||
}
|
||||
107
tests/streaming-reply.test.ts
Normal file
107
tests/streaming-reply.test.ts
Normal file
|
|
@ -0,0 +1,107 @@
|
|||
/**
|
||||
* Tests for streaming reply logic.
|
||||
*/
|
||||
|
||||
import { describe, it, expect, vi } from "vitest";
|
||||
import { StreamingReply } from "../src/streaming-reply.js";
|
||||
import type { TelegramAPI } from "../src/telegram-api.js";
|
||||
|
||||
describe("streaming-reply", () => {
|
||||
it("should throttle edits", async () => {
|
||||
const sent: Array<{ type: string; text: string }> = [];
|
||||
const mockApi = {
|
||||
sendMessage: vi.fn(async (_chatId: string, text: string) => {
|
||||
sent.push({ type: "send", text });
|
||||
return { message_id: 1 };
|
||||
}),
|
||||
editMessageText: vi.fn(async (_chatId: string, _msgId: number, text: string) => {
|
||||
sent.push({ type: "edit", text });
|
||||
}),
|
||||
} as unknown as TelegramAPI;
|
||||
|
||||
const streaming = new StreamingReply(mockApi, "123", 1, {
|
||||
throttleMs: 100,
|
||||
minInitialChars: 10,
|
||||
});
|
||||
|
||||
streaming.update("Hello");
|
||||
streaming.update("Hello world");
|
||||
streaming.update("Hello world!!!");
|
||||
|
||||
// Wait for throttle + initial char check
|
||||
await new Promise((resolve) => setTimeout(resolve, 150));
|
||||
|
||||
expect(sent.length).toBeGreaterThan(0);
|
||||
expect(sent[0]?.type).toBe("send");
|
||||
expect(sent[0]?.text).toContain("Hello");
|
||||
|
||||
await streaming.stop();
|
||||
});
|
||||
|
||||
it("should track generation to prevent clobbering", () => {
|
||||
const mockApi = {
|
||||
sendMessage: vi.fn(async () => ({ message_id: 1 })),
|
||||
editMessageText: vi.fn(),
|
||||
} as unknown as TelegramAPI;
|
||||
|
||||
const streaming1 = new StreamingReply(mockApi, "123", 1, { throttleMs: 1000 });
|
||||
const streaming2 = new StreamingReply(mockApi, "123", 2, { throttleMs: 1000 });
|
||||
|
||||
expect(streaming1.getGeneration()).toBe(1);
|
||||
expect(streaming2.getGeneration()).toBe(2);
|
||||
expect(streaming1.getGeneration()).not.toBe(streaming2.getGeneration());
|
||||
});
|
||||
|
||||
it("should truncate text exceeding 4096 chars", async () => {
|
||||
const sent: string[] = [];
|
||||
const mockApi = {
|
||||
sendMessage: vi.fn(async (_chatId: string, text: string) => {
|
||||
sent.push(text);
|
||||
return { message_id: 1 };
|
||||
}),
|
||||
editMessageText: vi.fn(),
|
||||
} as unknown as TelegramAPI;
|
||||
|
||||
const streaming = new StreamingReply(mockApi, "123", 1, {
|
||||
throttleMs: 50,
|
||||
minInitialChars: 0,
|
||||
});
|
||||
|
||||
streaming.update("x".repeat(5000));
|
||||
await streaming.flush();
|
||||
|
||||
expect(sent.length).toBe(1);
|
||||
expect(sent[0]!.length).toBe(4096);
|
||||
|
||||
await streaming.stop();
|
||||
});
|
||||
|
||||
it("should debounce first send based on minInitialChars", async () => {
|
||||
const sent: string[] = [];
|
||||
const mockApi = {
|
||||
sendMessage: vi.fn(async (_chatId: string, text: string) => {
|
||||
sent.push(text);
|
||||
return { message_id: 1 };
|
||||
}),
|
||||
editMessageText: vi.fn(),
|
||||
} as unknown as TelegramAPI;
|
||||
|
||||
const streaming = new StreamingReply(mockApi, "123", 1, {
|
||||
throttleMs: 50,
|
||||
minInitialChars: 20,
|
||||
});
|
||||
|
||||
streaming.update("Short");
|
||||
await streaming.flush();
|
||||
|
||||
// Should not send yet
|
||||
expect(sent.length).toBe(0);
|
||||
|
||||
streaming.update("This is now long enough to send");
|
||||
await streaming.flush();
|
||||
|
||||
expect(sent.length).toBe(1);
|
||||
|
||||
await streaming.stop();
|
||||
});
|
||||
});
|
||||
123
tests/telegram-api.test.ts
Normal file
123
tests/telegram-api.test.ts
Normal file
|
|
@ -0,0 +1,123 @@
|
|||
/**
|
||||
* Tests for Telegram API wrapper.
|
||||
*/
|
||||
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import { TelegramAPI } from "../src/telegram-api.js";
|
||||
|
||||
describe("telegram-api", () => {
|
||||
const originalFetch = global.fetch;
|
||||
|
||||
beforeEach(() => {
|
||||
global.fetch = vi.fn();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
global.fetch = originalFetch;
|
||||
});
|
||||
|
||||
it("should call getMe", async () => {
|
||||
(global.fetch as ReturnType<typeof vi.fn>).mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: async () => ({ ok: true, result: { id: 123, first_name: "Bot" } }),
|
||||
});
|
||||
|
||||
const api = new TelegramAPI("test-token");
|
||||
const result = await api.getMe();
|
||||
|
||||
expect(result).toEqual({ id: 123, first_name: "Bot" });
|
||||
expect(global.fetch).toHaveBeenCalledWith(
|
||||
"https://api.telegram.org/bottest-token/getMe",
|
||||
expect.objectContaining({ method: "POST" })
|
||||
);
|
||||
});
|
||||
|
||||
it("should call sendMessage", async () => {
|
||||
(global.fetch as ReturnType<typeof vi.fn>).mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: async () => ({ ok: true, result: { message_id: 456 } }),
|
||||
});
|
||||
|
||||
const api = new TelegramAPI("test-token");
|
||||
const result = await api.sendMessage("123", "Hello");
|
||||
|
||||
expect(result).toEqual({ message_id: 456 });
|
||||
expect(global.fetch).toHaveBeenCalledWith(
|
||||
"https://api.telegram.org/bottest-token/sendMessage",
|
||||
expect.objectContaining({
|
||||
method: "POST",
|
||||
body: JSON.stringify({ chat_id: "123", text: "Hello" }),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should call editMessageText", async () => {
|
||||
(global.fetch as ReturnType<typeof vi.fn>).mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: async () => ({ ok: true, result: true }),
|
||||
});
|
||||
|
||||
const api = new TelegramAPI("test-token");
|
||||
await api.editMessageText("123", 456, "Updated");
|
||||
|
||||
expect(global.fetch).toHaveBeenCalledWith(
|
||||
"https://api.telegram.org/bottest-token/editMessageText",
|
||||
expect.objectContaining({
|
||||
method: "POST",
|
||||
body: JSON.stringify({ chat_id: "123", message_id: 456, text: "Updated" }),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should call deleteMessage", async () => {
|
||||
(global.fetch as ReturnType<typeof vi.fn>).mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: async () => ({ ok: true, result: true }),
|
||||
});
|
||||
|
||||
const api = new TelegramAPI("test-token");
|
||||
await api.deleteMessage("123", 456);
|
||||
|
||||
expect(global.fetch).toHaveBeenCalledWith(
|
||||
"https://api.telegram.org/bottest-token/deleteMessage",
|
||||
expect.objectContaining({
|
||||
method: "POST",
|
||||
body: JSON.stringify({ chat_id: "123", message_id: 456 }),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should call setWebhook", async () => {
|
||||
(global.fetch as ReturnType<typeof vi.fn>).mockResolvedValueOnce({
|
||||
ok: true,
|
||||
json: async () => ({ ok: true, result: true }),
|
||||
});
|
||||
|
||||
const api = new TelegramAPI("test-token");
|
||||
await api.setWebhook("https://example.com/webhook", "secret");
|
||||
|
||||
expect(global.fetch).toHaveBeenCalledWith(
|
||||
"https://api.telegram.org/bottest-token/setWebhook",
|
||||
expect.objectContaining({
|
||||
method: "POST",
|
||||
body: JSON.stringify({
|
||||
url: "https://example.com/webhook",
|
||||
secret_token: "secret",
|
||||
allowed_updates: ["message"],
|
||||
}),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should handle API errors", async () => {
|
||||
(global.fetch as ReturnType<typeof vi.fn>).mockResolvedValueOnce({
|
||||
ok: false,
|
||||
status: 401,
|
||||
text: async () => "Unauthorized",
|
||||
});
|
||||
|
||||
const api = new TelegramAPI("bad-token");
|
||||
|
||||
await expect(api.getMe()).rejects.toThrow("Telegram API getMe failed (401): Unauthorized");
|
||||
});
|
||||
});
|
||||
195
tests/webhook-server.test.ts
Normal file
195
tests/webhook-server.test.ts
Normal file
|
|
@ -0,0 +1,195 @@
|
|||
/**
|
||||
* Tests for webhook server security features.
|
||||
*/
|
||||
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { startWebhookServer } from "../src/webhook-server.js";
|
||||
import type { TelegramUpdate } from "../src/types.js";
|
||||
|
||||
describe("webhook-server", () => {
|
||||
it("should reject requests without secret", async () => {
|
||||
const received: TelegramUpdate[] = [];
|
||||
const { server, stop } = startWebhookServer({
|
||||
port: 0, // Random port
|
||||
host: "127.0.0.1",
|
||||
path: "/webhook",
|
||||
secret: "test-secret",
|
||||
trustedProxies: [],
|
||||
onUpdate: (update) => received.push(update),
|
||||
});
|
||||
|
||||
// Wait for server to be listening
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
const address = server.address();
|
||||
const port = address && typeof address === "object" ? address.port : 0;
|
||||
|
||||
try {
|
||||
const res = await fetch(`http://127.0.0.1:${port}/webhook`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ update_id: 1, message: { message_id: 1 } }),
|
||||
});
|
||||
|
||||
expect(res.status).toBe(401);
|
||||
expect(received.length).toBe(0);
|
||||
} finally {
|
||||
stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("should accept requests with valid secret", async () => {
|
||||
const received: TelegramUpdate[] = [];
|
||||
const { server, stop } = startWebhookServer({
|
||||
port: 0,
|
||||
host: "127.0.0.1",
|
||||
path: "/webhook",
|
||||
secret: "test-secret",
|
||||
trustedProxies: [],
|
||||
onUpdate: (update) => received.push(update),
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
const address = server.address();
|
||||
const port = address && typeof address === "object" ? address.port : 0;
|
||||
|
||||
try {
|
||||
const update: TelegramUpdate = {
|
||||
update_id: 1,
|
||||
message: {
|
||||
message_id: 1,
|
||||
chat: { id: 123, type: "private" },
|
||||
date: Date.now(),
|
||||
text: "hello",
|
||||
},
|
||||
};
|
||||
|
||||
const res = await fetch(`http://127.0.0.1:${port}/webhook`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"X-Telegram-Bot-Api-Secret-Token": "test-secret",
|
||||
},
|
||||
body: JSON.stringify(update),
|
||||
});
|
||||
|
||||
expect(res.status).toBe(200);
|
||||
expect(received.length).toBe(1);
|
||||
expect(received[0]?.update_id).toBe(1);
|
||||
} finally {
|
||||
stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("should reject payloads exceeding body limit", async () => {
|
||||
const received: TelegramUpdate[] = [];
|
||||
const { server, stop } = startWebhookServer({
|
||||
port: 0,
|
||||
host: "127.0.0.1",
|
||||
path: "/webhook",
|
||||
secret: "test-secret",
|
||||
trustedProxies: [],
|
||||
onUpdate: (update) => received.push(update),
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
const address = server.address();
|
||||
const port = address && typeof address === "object" ? address.port : 0;
|
||||
|
||||
try {
|
||||
const largePayload = JSON.stringify({
|
||||
update_id: 1,
|
||||
message: { text: "x".repeat(2_000_000) }, // > 1MB
|
||||
});
|
||||
|
||||
// When payload exceeds limit, server may respond with 413 or destroy connection
|
||||
let rejected = false;
|
||||
try {
|
||||
const res = await fetch(`http://127.0.0.1:${port}/webhook`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"X-Telegram-Bot-Api-Secret-Token": "test-secret",
|
||||
},
|
||||
body: largePayload,
|
||||
});
|
||||
rejected = res.status === 413;
|
||||
} catch (err: any) {
|
||||
// Connection destroyed (ECONNRESET) is also acceptable
|
||||
rejected = err.code === "ECONNRESET" || err.message?.includes("ECONNRESET") || err.cause?.code === "ECONNRESET";
|
||||
}
|
||||
|
||||
expect(rejected).toBe(true);
|
||||
expect(received.length).toBe(0);
|
||||
} finally {
|
||||
stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("should enforce rate limits per IP", async () => {
|
||||
const received: TelegramUpdate[] = [];
|
||||
const { server, stop } = startWebhookServer({
|
||||
port: 0,
|
||||
host: "127.0.0.1",
|
||||
path: "/webhook",
|
||||
secret: "test-secret",
|
||||
trustedProxies: [],
|
||||
onUpdate: (update) => received.push(update),
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
const address = server.address();
|
||||
const port = address && typeof address === "object" ? address.port : 0;
|
||||
|
||||
try {
|
||||
// Make 65 requests (rate limit is 60/min)
|
||||
const requests = [];
|
||||
for (let i = 0; i < 65; i++) {
|
||||
requests.push(
|
||||
fetch(`http://127.0.0.1:${port}/webhook`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"X-Telegram-Bot-Api-Secret-Token": "test-secret",
|
||||
},
|
||||
body: JSON.stringify({ update_id: i }),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
const results = await Promise.all(requests);
|
||||
const statuses = results.map((r) => r.status);
|
||||
|
||||
// First 60 should succeed, rest should be rate limited
|
||||
const ok = statuses.filter((s) => s === 200).length;
|
||||
const rateLimited = statuses.filter((s) => s === 429).length;
|
||||
|
||||
expect(ok).toBe(60);
|
||||
expect(rateLimited).toBe(5);
|
||||
} finally {
|
||||
stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("should respond to /healthz", async () => {
|
||||
const { server, stop } = startWebhookServer({
|
||||
port: 0,
|
||||
host: "127.0.0.1",
|
||||
path: "/webhook",
|
||||
secret: "test-secret",
|
||||
trustedProxies: [],
|
||||
onUpdate: () => {},
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
const address = server.address();
|
||||
const port = address && typeof address === "object" ? address.port : 0;
|
||||
|
||||
try {
|
||||
const res = await fetch(`http://127.0.0.1:${port}/healthz`);
|
||||
expect(res.status).toBe(200);
|
||||
expect(await res.text()).toBe("ok");
|
||||
} finally {
|
||||
stop();
|
||||
}
|
||||
});
|
||||
});
|
||||
21
tsconfig.json
Normal file
21
tsconfig.json
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
{
|
||||
"compilerOptions": {
|
||||
"target": "ES2022",
|
||||
"module": "ESNext",
|
||||
"moduleResolution": "bundler",
|
||||
"lib": ["ES2022"],
|
||||
"outDir": "./dist",
|
||||
"rootDir": "./src",
|
||||
"declaration": true,
|
||||
"declarationMap": true,
|
||||
"sourceMap": true,
|
||||
"strict": true,
|
||||
"esModuleInterop": true,
|
||||
"skipLibCheck": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"resolveJsonModule": true,
|
||||
"types": ["node", "vitest/globals"]
|
||||
},
|
||||
"include": ["src/**/*"],
|
||||
"exclude": ["node_modules", "dist", "tests"]
|
||||
}
|
||||
8
vitest.config.ts
Normal file
8
vitest.config.ts
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
import { defineConfig } from "vitest/config";
|
||||
|
||||
export default defineConfig({
|
||||
test: {
|
||||
globals: true,
|
||||
environment: "node",
|
||||
},
|
||||
});
|
||||
Loading…
Add table
Add a link
Reference in a new issue