openclaw-vainplex/src/telegram/monitor.ts
Muhammed Mukhthar CM 1a41fecf67 feat(telegram): use grammyjs/runner for concurrent update processing
Previously, grammY's default bot.start() processed updates sequentially,
blocking all Telegram messages while one was being handled. This made
maxConcurrent settings ineffective for Telegram.

Now uses @grammyjs/runner which processes updates concurrently, matching
the behavior of Discord (Promise.all) and WhatsApp (fire-and-forget).

Benefits:
- Ack reactions (👀) appear immediately, not after queue clears
- Multiple chats can be processed in parallel
- maxConcurrent setting now works correctly for Telegram
- Long-running tool calls no longer block other conversations
2026-01-07 22:08:20 +01:00

80 lines
2 KiB
TypeScript

import { run } from "@grammyjs/runner";
import { loadConfig } from "../config/config.js";
import type { RuntimeEnv } from "../runtime.js";
import { createTelegramBot } from "./bot.js";
import { makeProxyFetch } from "./proxy.js";
import { resolveTelegramToken } from "./token.js";
import { startTelegramWebhook } from "./webhook.js";
export type MonitorTelegramOpts = {
token?: string;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
useWebhook?: boolean;
webhookPath?: string;
webhookPort?: number;
webhookSecret?: string;
proxyFetch?: typeof fetch;
webhookUrl?: string;
};
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
const { token } = resolveTelegramToken(loadConfig(), {
envToken: opts.token,
});
if (!token) {
throw new Error(
"TELEGRAM_BOT_TOKEN or telegram.botToken/tokenFile is required for Telegram gateway",
);
}
const proxyFetch =
opts.proxyFetch ??
(loadConfig().telegram?.proxy
? makeProxyFetch(loadConfig().telegram?.proxy as string)
: undefined);
const bot = createTelegramBot({
token,
runtime: opts.runtime,
proxyFetch,
});
if (opts.useWebhook) {
await startTelegramWebhook({
token,
path: opts.webhookPath,
port: opts.webhookPort,
secret: opts.webhookSecret,
runtime: opts.runtime as RuntimeEnv,
fetch: proxyFetch,
abortSignal: opts.abortSignal,
publicUrl: opts.webhookUrl,
});
return;
}
// Use grammyjs/runner for concurrent update processing
const runner = run(bot, {
runner: {
fetch: {
// Match grammY defaults
timeout: 30,
},
},
});
const stopOnAbort = () => {
if (opts.abortSignal?.aborted) {
runner.stop();
}
};
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
try {
// runner.task() returns a promise that resolves when the runner stops
await runner.task();
} finally {
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
}
}