From 37721ebd7ca95205973023947016cc772aed4a27 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sat, 31 Jan 2026 21:55:59 +0530 Subject: [PATCH] fix: restore telegram draft streaming partials --- docs/channels/telegram.md | 12 +++ ...pi-embedded-subscribe.handlers.messages.ts | 59 +++++----- .../pi-embedded-subscribe.handlers.types.ts | 5 + .../pi-embedded-subscribe.reply-tags.test.ts | 46 ++++++++ src/agents/pi-embedded-subscribe.ts | 7 ++ src/telegram/bot-message-context.ts | 6 +- src/telegram/bot-message-dispatch.test.ts | 101 ++++++++++++++++++ src/telegram/bot-message-dispatch.ts | 22 ++-- src/telegram/bot.ts | 8 +- src/telegram/bot/helpers.ts | 7 +- src/telegram/draft-stream.test.ts | 35 ++++++ src/telegram/draft-stream.ts | 6 +- 12 files changed, 260 insertions(+), 54 deletions(-) create mode 100644 src/telegram/bot-message-dispatch.test.ts create mode 100644 src/telegram/draft-stream.test.ts diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index 38e86d6b8..e454553e5 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -109,6 +109,18 @@ group messages, so use admin if you need full visibility. - Long-polling uses grammY runner with per-chat sequencing; overall concurrency is capped by `agents.defaults.maxConcurrent`. - Telegram Bot API does not support read receipts; there is no `sendReadReceipts` option. +## Draft streaming + +OpenClaw can stream partial replies in Telegram DMs using `sendMessageDraft`. + +Requirements: + +- Threaded Mode enabled for the bot in @BotFather (forum topic mode). +- Private chat threads only (Telegram includes `message_thread_id` on inbound messages). +- `channels.telegram.streamMode` not set to `"off"` (default: `"partial"`, `"block"` enables chunked draft updates). + +Draft streaming is DM-only; Telegram does not support it in groups or channels. + ## Formatting (Telegram HTML) - Outbound Telegram text uses `parse_mode: "HTML"` (Telegram’s supported tag subset). diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index e89d254c6..54b4edd1a 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -1,6 +1,5 @@ import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core"; -import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js"; import { emitAgentEvent } from "../infra/agent-events.js"; import { isMessagingToolDuplicateNormalized, @@ -111,35 +110,39 @@ export function handleMessageUpdate( }) .trim(); if (next && next !== ctx.state.lastStreamedAssistant) { - const previousText = ctx.state.lastStreamedAssistant ?? ""; - const { text: cleanedText, mediaUrls } = parseReplyDirectives(next); - const { text: previousCleanedText } = parseReplyDirectives(previousText); - if (cleanedText.startsWith(previousCleanedText)) { - const deltaText = cleanedText.slice(previousCleanedText.length); + const parsedDelta = chunk ? ctx.consumePartialReplyDirectives(chunk) : null; + const deltaText = parsedDelta?.text ?? ""; + const mediaUrls = parsedDelta?.mediaUrls; + if (!deltaText && (!mediaUrls || mediaUrls.length === 0) && !parsedDelta?.audioAsVoice) { ctx.state.lastStreamedAssistant = next; - emitAgentEvent({ - runId: ctx.params.runId, - stream: "assistant", - data: { - text: cleanedText, - delta: deltaText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, + return; + } + const previousCleaned = ctx.state.lastStreamedAssistantCleaned ?? ""; + const cleanedText = `${previousCleaned}${deltaText}`; + ctx.state.lastStreamedAssistant = next; + ctx.state.lastStreamedAssistantCleaned = cleanedText; + emitAgentEvent({ + runId: ctx.params.runId, + stream: "assistant", + data: { + text: cleanedText, + delta: deltaText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, + }); + void ctx.params.onAgentEvent?.({ + stream: "assistant", + data: { + text: cleanedText, + delta: deltaText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + }, + }); + if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) { + void ctx.params.onPartialReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, }); - void ctx.params.onAgentEvent?.({ - stream: "assistant", - data: { - text: cleanedText, - delta: deltaText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }, - }); - if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) { - void ctx.params.onPartialReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - }); - } } } diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 4a464c5e2..92869fda3 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -38,6 +38,7 @@ export type EmbeddedPiSubscribeState = { blockBuffer: string; blockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState }; lastStreamedAssistant?: string; + lastStreamedAssistantCleaned?: string; lastStreamedReasoning?: string; lastBlockReplyText?: string; assistantMessageIndex: number; @@ -82,6 +83,10 @@ export type EmbeddedPiSubscribeContext = { text: string, options?: { final?: boolean }, ) => ReplyDirectiveParseResult | null; + consumePartialReplyDirectives: ( + text: string, + options?: { final?: boolean }, + ) => ReplyDirectiveParseResult | null; resetAssistantMessageState: (nextAssistantTextBaseline: number) => void; resetForCompactionRetry: () => void; finalizeAssistantTexts: (args: { diff --git a/src/agents/pi-embedded-subscribe.reply-tags.test.ts b/src/agents/pi-embedded-subscribe.reply-tags.test.ts index 5243c8488..7495b7f6f 100644 --- a/src/agents/pi-embedded-subscribe.reply-tags.test.ts +++ b/src/agents/pi-embedded-subscribe.reply-tags.test.ts @@ -103,4 +103,50 @@ describe("subscribeEmbeddedPiSession reply tags", () => { expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello"); expect(onBlockReply.mock.calls[1]?.[0]?.text).toBe("[["); }); + + it("streams partial replies past reply_to tags split across chunks", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onPartialReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + onPartialReply, + }); + + handler?.({ type: "message_start", message: { role: "assistant" } }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: "[[reply_to:1897" }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: "]] Hello" }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_delta", delta: " world" }, + }); + handler?.({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { type: "text_end" }, + }); + + const lastPayload = onPartialReply.mock.calls.at(-1)?.[0]; + expect(lastPayload?.text).toBe("Hello world"); + for (const call of onPartialReply.mock.calls) { + expect(call[0]?.text?.includes("[[reply_to")).toBe(false); + } + }); }); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 2e58b4af3..5d04f8f8e 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -47,6 +47,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar // Track if a streamed chunk opened a block (stateful across chunks). blockState: { thinking: false, final: false, inlineCode: createInlineCodeState() }, lastStreamedAssistant: undefined, + lastStreamedAssistantCleaned: undefined, lastStreamedReasoning: undefined, lastBlockReplyText: undefined, assistantMessageIndex: 0, @@ -77,16 +78,19 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const pendingMessagingTexts = state.pendingMessagingTexts; const pendingMessagingTargets = state.pendingMessagingTargets; const replyDirectiveAccumulator = createStreamingDirectiveAccumulator(); + const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator(); const resetAssistantMessageState = (nextAssistantTextBaseline: number) => { state.deltaBuffer = ""; state.blockBuffer = ""; blockChunker?.reset(); replyDirectiveAccumulator.reset(); + partialReplyDirectiveAccumulator.reset(); state.blockState.thinking = false; state.blockState.final = false; state.blockState.inlineCode = createInlineCodeState(); state.lastStreamedAssistant = undefined; + state.lastStreamedAssistantCleaned = undefined; state.lastBlockReplyText = undefined; state.lastStreamedReasoning = undefined; state.lastReasoningSent = undefined; @@ -447,6 +451,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const consumeReplyDirectives = (text: string, options?: { final?: boolean }) => replyDirectiveAccumulator.consume(text, options); + const consumePartialReplyDirectives = (text: string, options?: { final?: boolean }) => + partialReplyDirectiveAccumulator.consume(text, options); const flushBlockReplyBuffer = () => { if (!params.onBlockReply) { @@ -509,6 +515,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar flushBlockReplyBuffer, emitReasoningStream, consumeReplyDirectives, + consumePartialReplyDirectives, resetAssistantMessageState, resetForCompactionRetry, finalizeAssistantTexts, diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index b86aa2d60..06b6dc0aa 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -163,6 +163,7 @@ export const buildTelegramMessageContext = async ({ isForum, messageThreadId, }); + const replyThreadId = isGroup ? resolvedThreadId : messageThreadId; const { groupConfig, topicConfig } = resolveTelegramGroupConfig(chatId, resolvedThreadId); const peerId = isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId); const route = resolveAgentRoute({ @@ -205,7 +206,7 @@ export const buildTelegramMessageContext = async ({ const sendTyping = async () => { await withTelegramApiErrorLogging({ operation: "sendChatAction", - fn: () => bot.api.sendChatAction(chatId, "typing", buildTypingThreadParams(resolvedThreadId)), + fn: () => bot.api.sendChatAction(chatId, "typing", buildTypingThreadParams(replyThreadId)), }); }; @@ -214,7 +215,7 @@ export const buildTelegramMessageContext = async ({ await withTelegramApiErrorLogging({ operation: "sendChatAction", fn: () => - bot.api.sendChatAction(chatId, "record_voice", buildTypingThreadParams(resolvedThreadId)), + bot.api.sendChatAction(chatId, "record_voice", buildTypingThreadParams(replyThreadId)), }); } catch (err) { logVerbose(`telegram record_voice cue failed for chat ${chatId}: ${String(err)}`); @@ -672,6 +673,7 @@ export const buildTelegramMessageContext = async ({ chatId, isGroup, resolvedThreadId, + replyThreadId, isForum, historyKey, historyLimit, diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts new file mode 100644 index 000000000..2916ca21b --- /dev/null +++ b/src/telegram/bot-message-dispatch.test.ts @@ -0,0 +1,101 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const createTelegramDraftStream = vi.hoisted(() => vi.fn()); +const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() => vi.fn()); +const deliverReplies = vi.hoisted(() => vi.fn()); + +vi.mock("./draft-stream.js", () => ({ + createTelegramDraftStream, +})); + +vi.mock("../auto-reply/reply/provider-dispatcher.js", () => ({ + dispatchReplyWithBufferedBlockDispatcher, +})); + +vi.mock("./bot/delivery.js", () => ({ + deliverReplies, +})); + +vi.mock("./sticker-cache.js", () => ({ + cacheSticker: vi.fn(), + describeStickerImage: vi.fn(), +})); + +import { dispatchTelegramMessage } from "./bot-message-dispatch.js"; + +describe("dispatchTelegramMessage draft streaming", () => { + beforeEach(() => { + createTelegramDraftStream.mockReset(); + dispatchReplyWithBufferedBlockDispatcher.mockReset(); + deliverReplies.mockReset(); + }); + + it("streams drafts in private threads and forwards thread id", async () => { + const draftStream = { + update: vi.fn(), + flush: vi.fn().mockResolvedValue(undefined), + stop: vi.fn(), + }; + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Hello" }); + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + const resolveBotTopicsEnabled = vi.fn().mockResolvedValue(true); + const context = { + ctxPayload: {}, + primaryCtx: { message: { chat: { id: 123, type: "private" } } }, + msg: { + chat: { id: 123, type: "private" }, + message_id: 456, + message_thread_id: 777, + }, + chatId: 123, + isGroup: false, + resolvedThreadId: undefined, + replyThreadId: 777, + historyKey: undefined, + historyLimit: 0, + groupHistories: new Map(), + route: { agentId: "default", accountId: "default" }, + skillFilter: undefined, + sendTyping: vi.fn(), + sendRecordVoice: vi.fn(), + ackReactionPromise: null, + reactionApi: null, + removeAckAfterReply: false, + }; + + await dispatchTelegramMessage({ + context, + bot: { api: {} }, + cfg: {}, + runtime: {}, + replyToMode: "first", + streamMode: "partial", + textLimit: 4096, + telegramCfg: {}, + opts: {}, + resolveBotTopicsEnabled, + }); + + expect(resolveBotTopicsEnabled).toHaveBeenCalledWith(context.primaryCtx); + expect(createTelegramDraftStream).toHaveBeenCalledWith( + expect.objectContaining({ + chatId: 123, + messageThreadId: 777, + }), + ); + expect(draftStream.update).toHaveBeenCalledWith("Hello"); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + messageThreadId: 777, + }), + ); + }); +}); diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 6cc626a07..e66ec2cf5 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -1,10 +1,10 @@ // @ts-nocheck -import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js"; import { findModelInCatalog, loadModelCatalog, modelSupportsVision, } from "../agents/model-catalog.js"; +import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js"; import { resolveDefaultModelForAgent } from "../agents/model-selection.js"; import { resolveChunkMode } from "../auto-reply/chunk.js"; import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js"; @@ -55,7 +55,7 @@ export const dispatchTelegramMessage = async ({ msg, chatId, isGroup, - resolvedThreadId, + replyThreadId, historyKey, historyLimit, groupHistories, @@ -69,11 +69,12 @@ export const dispatchTelegramMessage = async ({ } = context; const isPrivateChat = msg.chat.type === "private"; + const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id; const draftMaxChars = Math.min(textLimit, 4096); const canStreamDraft = streamMode !== "off" && isPrivateChat && - typeof resolvedThreadId === "number" && + typeof messageThreadId === "number" && (await resolveBotTopicsEnabled(primaryCtx)); const draftStream = canStreamDraft ? createTelegramDraftStream({ @@ -81,7 +82,7 @@ export const dispatchTelegramMessage = async ({ chatId, draftId: msg.message_id || Date.now(), maxChars: draftMaxChars, - messageThreadId: resolvedThreadId, + messageThreadId: replyThreadId, log: logVerbose, warn: logVerbose, }) @@ -240,7 +241,7 @@ export const dispatchTelegramMessage = async ({ bot, replyToMode, textLimit, - messageThreadId: resolvedThreadId, + messageThreadId: replyThreadId, tableMode, chunkMode, onVoiceRecording: sendRecordVoice, @@ -273,15 +274,8 @@ export const dispatchTelegramMessage = async ({ }, replyOptions: { skillFilter, - onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined, - onReasoningStream: draftStream - ? (payload) => { - if (payload.text) { - draftStream.update(payload.text); - } - } - : undefined, disableBlockStreaming, + onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined, onModelSelected: (ctx) => { prefixContext.onModelSelected(ctx); }, @@ -298,7 +292,7 @@ export const dispatchTelegramMessage = async ({ bot, replyToMode, textLimit, - messageThreadId: resolvedThreadId, + messageThreadId: replyThreadId, tableMode, chunkMode, linkPreview: telegramCfg.linkPreview, diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index c416f9776..f4416ca30 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -24,7 +24,6 @@ import { createSubsystemLogger } from "../logging/subsystem.js"; import { formatUncaughtError } from "../infra/errors.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; -import { withTelegramApiErrorLogging } from "./api-logging.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; import { resolveThreadSessionKeys } from "../routing/session-key.js"; import type { RuntimeEnv } from "../runtime.js"; @@ -44,6 +43,7 @@ import { resolveTelegramUpdateId, type TelegramUpdateKeyContext, } from "./bot-updates.js"; +import { withTelegramApiErrorLogging } from "./api-logging.js"; import { resolveTelegramFetch } from "./fetch.js"; import { wasSentByBot } from "./sent-message-cache.js"; @@ -245,7 +245,6 @@ export function createTelegramBot(opts: TelegramBotOptions) { : undefined) ?? (opts.allowFrom && opts.allowFrom.length > 0 ? opts.allowFrom : undefined); const replyToMode = opts.replyToMode ?? telegramCfg.replyToMode ?? "first"; - const streamMode = resolveTelegramStreamMode(telegramCfg); const nativeEnabled = resolveNativeCommandsEnabled({ providerId: "telegram", providerSetting: telegramCfg.commands?.native, @@ -264,6 +263,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { const ackReactionScope = cfg.messages?.ackReactionScope ?? "group-mentions"; const mediaMaxBytes = (opts.mediaMaxMb ?? telegramCfg.mediaMaxMb ?? 5) * 1024 * 1024; const logger = getChildLogger({ module: "telegram-auto-reply" }); + const streamMode = resolveTelegramStreamMode(telegramCfg); let botHasTopicsEnabled: boolean | undefined; const resolveBotTopicsEnabled = async (ctx?: TelegramContext) => { const fromCtx = ctx?.me as { has_topics_enabled?: boolean } | undefined; @@ -274,6 +274,10 @@ export function createTelegramBot(opts: TelegramBotOptions) { if (typeof botHasTopicsEnabled === "boolean") { return botHasTopicsEnabled; } + if (typeof bot.api.getMe !== "function") { + botHasTopicsEnabled = false; + return botHasTopicsEnabled; + } try { const me = (await withTelegramApiErrorLogging({ operation: "getMe", diff --git a/src/telegram/bot/helpers.ts b/src/telegram/bot/helpers.ts index 17451387c..1ee479be3 100644 --- a/src/telegram/bot/helpers.ts +++ b/src/telegram/bot/helpers.ts @@ -1,5 +1,4 @@ import { formatLocationText, type NormalizedLocation } from "../../channels/location.js"; -import type { TelegramAccountConfig } from "../../config/types.telegram.js"; import type { TelegramForwardChat, TelegramForwardOrigin, @@ -61,9 +60,9 @@ export function buildTypingThreadParams(messageThreadId?: number) { return { message_thread_id: Math.trunc(messageThreadId) }; } -export function resolveTelegramStreamMode( - telegramCfg: Pick | undefined, -): TelegramStreamMode { +export function resolveTelegramStreamMode(telegramCfg?: { + streamMode?: TelegramStreamMode; +}): TelegramStreamMode { const raw = telegramCfg?.streamMode?.trim().toLowerCase(); if (raw === "off" || raw === "partial" || raw === "block") { return raw; diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts new file mode 100644 index 000000000..920951e86 --- /dev/null +++ b/src/telegram/draft-stream.test.ts @@ -0,0 +1,35 @@ +import { describe, expect, it, vi } from "vitest"; + +import { createTelegramDraftStream } from "./draft-stream.js"; + +describe("createTelegramDraftStream", () => { + it("passes message_thread_id when provided", () => { + const api = { sendMessageDraft: vi.fn().mockResolvedValue(true) }; + const stream = createTelegramDraftStream({ + api: api as any, + chatId: 123, + draftId: 42, + messageThreadId: 99, + }); + + stream.update("Hello"); + + expect(api.sendMessageDraft).toHaveBeenCalledWith(123, 42, "Hello", { + message_thread_id: 99, + }); + }); + + it("omits message_thread_id for general topic id", () => { + const api = { sendMessageDraft: vi.fn().mockResolvedValue(true) }; + const stream = createTelegramDraftStream({ + api: api as any, + chatId: 123, + draftId: 42, + messageThreadId: 1, + }); + + stream.update("Hello"); + + expect(api.sendMessageDraft).toHaveBeenCalledWith(123, 42, "Hello", undefined); + }); +}); diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 3d63587e6..4eb121e42 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -1,4 +1,5 @@ import type { Bot } from "grammy"; +import { buildTelegramThreadParams } from "./bot/helpers.js"; const TELEGRAM_DRAFT_MAX_CHARS = 4096; const DEFAULT_THROTTLE_MS = 300; @@ -24,10 +25,7 @@ export function createTelegramDraftStream(params: { const rawDraftId = Number.isFinite(params.draftId) ? Math.trunc(params.draftId) : 1; const draftId = rawDraftId === 0 ? 1 : Math.abs(rawDraftId); const chatId = params.chatId; - const threadParams = - typeof params.messageThreadId === "number" - ? { message_thread_id: Math.trunc(params.messageThreadId) } - : undefined; + const threadParams = buildTelegramThreadParams(params.messageThreadId); let lastSentText = ""; let lastSentAt = 0;