fix: restore telegram draft streaming partials

This commit is contained in:
Ayaan Zaidi 2026-01-31 21:55:59 +05:30 committed by Ayaan Zaidi
parent 35988d77ec
commit 37721ebd7c
12 changed files with 260 additions and 54 deletions

View file

@ -109,6 +109,18 @@ group messages, so use admin if you need full visibility.
- Long-polling uses grammY runner with per-chat sequencing; overall concurrency is capped by `agents.defaults.maxConcurrent`.
- Telegram Bot API does not support read receipts; there is no `sendReadReceipts` option.
## Draft streaming
OpenClaw can stream partial replies in Telegram DMs using `sendMessageDraft`.
Requirements:
- Threaded Mode enabled for the bot in @BotFather (forum topic mode).
- Private chat threads only (Telegram includes `message_thread_id` on inbound messages).
- `channels.telegram.streamMode` not set to `"off"` (default: `"partial"`, `"block"` enables chunked draft updates).
Draft streaming is DM-only; Telegram does not support it in groups or channels.
## Formatting (Telegram HTML)
- Outbound Telegram text uses `parse_mode: "HTML"` (Telegrams supported tag subset).

View file

@ -1,6 +1,5 @@
import type { AgentEvent, AgentMessage } from "@mariozechner/pi-agent-core";
import { parseReplyDirectives } from "../auto-reply/reply/reply-directives.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import {
isMessagingToolDuplicateNormalized,
@ -111,35 +110,39 @@ export function handleMessageUpdate(
})
.trim();
if (next && next !== ctx.state.lastStreamedAssistant) {
const previousText = ctx.state.lastStreamedAssistant ?? "";
const { text: cleanedText, mediaUrls } = parseReplyDirectives(next);
const { text: previousCleanedText } = parseReplyDirectives(previousText);
if (cleanedText.startsWith(previousCleanedText)) {
const deltaText = cleanedText.slice(previousCleanedText.length);
const parsedDelta = chunk ? ctx.consumePartialReplyDirectives(chunk) : null;
const deltaText = parsedDelta?.text ?? "";
const mediaUrls = parsedDelta?.mediaUrls;
if (!deltaText && (!mediaUrls || mediaUrls.length === 0) && !parsedDelta?.audioAsVoice) {
ctx.state.lastStreamedAssistant = next;
emitAgentEvent({
runId: ctx.params.runId,
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
return;
}
const previousCleaned = ctx.state.lastStreamedAssistantCleaned ?? "";
const cleanedText = `${previousCleaned}${deltaText}`;
ctx.state.lastStreamedAssistant = next;
ctx.state.lastStreamedAssistantCleaned = cleanedText;
emitAgentEvent({
runId: ctx.params.runId,
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
void ctx.params.onAgentEvent?.({
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) {
void ctx.params.onPartialReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
void ctx.params.onAgentEvent?.({
stream: "assistant",
data: {
text: cleanedText,
delta: deltaText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
},
});
if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) {
void ctx.params.onPartialReply({
text: cleanedText,
mediaUrls: mediaUrls?.length ? mediaUrls : undefined,
});
}
}
}

View file

@ -38,6 +38,7 @@ export type EmbeddedPiSubscribeState = {
blockBuffer: string;
blockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState };
lastStreamedAssistant?: string;
lastStreamedAssistantCleaned?: string;
lastStreamedReasoning?: string;
lastBlockReplyText?: string;
assistantMessageIndex: number;
@ -82,6 +83,10 @@ export type EmbeddedPiSubscribeContext = {
text: string,
options?: { final?: boolean },
) => ReplyDirectiveParseResult | null;
consumePartialReplyDirectives: (
text: string,
options?: { final?: boolean },
) => ReplyDirectiveParseResult | null;
resetAssistantMessageState: (nextAssistantTextBaseline: number) => void;
resetForCompactionRetry: () => void;
finalizeAssistantTexts: (args: {

View file

@ -103,4 +103,50 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Hello");
expect(onBlockReply.mock.calls[1]?.[0]?.text).toBe("[[");
});
it("streams partial replies past reply_to tags split across chunks", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onPartialReply = vi.fn();
subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
onPartialReply,
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "[[reply_to:1897" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "]] Hello" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: " world" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_end" },
});
const lastPayload = onPartialReply.mock.calls.at(-1)?.[0];
expect(lastPayload?.text).toBe("Hello world");
for (const call of onPartialReply.mock.calls) {
expect(call[0]?.text?.includes("[[reply_to")).toBe(false);
}
});
});

View file

@ -47,6 +47,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
// Track if a streamed chunk opened a <think> block (stateful across chunks).
blockState: { thinking: false, final: false, inlineCode: createInlineCodeState() },
lastStreamedAssistant: undefined,
lastStreamedAssistantCleaned: undefined,
lastStreamedReasoning: undefined,
lastBlockReplyText: undefined,
assistantMessageIndex: 0,
@ -77,16 +78,19 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const pendingMessagingTexts = state.pendingMessagingTexts;
const pendingMessagingTargets = state.pendingMessagingTargets;
const replyDirectiveAccumulator = createStreamingDirectiveAccumulator();
const partialReplyDirectiveAccumulator = createStreamingDirectiveAccumulator();
const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {
state.deltaBuffer = "";
state.blockBuffer = "";
blockChunker?.reset();
replyDirectiveAccumulator.reset();
partialReplyDirectiveAccumulator.reset();
state.blockState.thinking = false;
state.blockState.final = false;
state.blockState.inlineCode = createInlineCodeState();
state.lastStreamedAssistant = undefined;
state.lastStreamedAssistantCleaned = undefined;
state.lastBlockReplyText = undefined;
state.lastStreamedReasoning = undefined;
state.lastReasoningSent = undefined;
@ -447,6 +451,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const consumeReplyDirectives = (text: string, options?: { final?: boolean }) =>
replyDirectiveAccumulator.consume(text, options);
const consumePartialReplyDirectives = (text: string, options?: { final?: boolean }) =>
partialReplyDirectiveAccumulator.consume(text, options);
const flushBlockReplyBuffer = () => {
if (!params.onBlockReply) {
@ -509,6 +515,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
flushBlockReplyBuffer,
emitReasoningStream,
consumeReplyDirectives,
consumePartialReplyDirectives,
resetAssistantMessageState,
resetForCompactionRetry,
finalizeAssistantTexts,

View file

@ -163,6 +163,7 @@ export const buildTelegramMessageContext = async ({
isForum,
messageThreadId,
});
const replyThreadId = isGroup ? resolvedThreadId : messageThreadId;
const { groupConfig, topicConfig } = resolveTelegramGroupConfig(chatId, resolvedThreadId);
const peerId = isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId);
const route = resolveAgentRoute({
@ -205,7 +206,7 @@ export const buildTelegramMessageContext = async ({
const sendTyping = async () => {
await withTelegramApiErrorLogging({
operation: "sendChatAction",
fn: () => bot.api.sendChatAction(chatId, "typing", buildTypingThreadParams(resolvedThreadId)),
fn: () => bot.api.sendChatAction(chatId, "typing", buildTypingThreadParams(replyThreadId)),
});
};
@ -214,7 +215,7 @@ export const buildTelegramMessageContext = async ({
await withTelegramApiErrorLogging({
operation: "sendChatAction",
fn: () =>
bot.api.sendChatAction(chatId, "record_voice", buildTypingThreadParams(resolvedThreadId)),
bot.api.sendChatAction(chatId, "record_voice", buildTypingThreadParams(replyThreadId)),
});
} catch (err) {
logVerbose(`telegram record_voice cue failed for chat ${chatId}: ${String(err)}`);
@ -672,6 +673,7 @@ export const buildTelegramMessageContext = async ({
chatId,
isGroup,
resolvedThreadId,
replyThreadId,
isForum,
historyKey,
historyLimit,

View file

@ -0,0 +1,101 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const createTelegramDraftStream = vi.hoisted(() => vi.fn());
const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() => vi.fn());
const deliverReplies = vi.hoisted(() => vi.fn());
vi.mock("./draft-stream.js", () => ({
createTelegramDraftStream,
}));
vi.mock("../auto-reply/reply/provider-dispatcher.js", () => ({
dispatchReplyWithBufferedBlockDispatcher,
}));
vi.mock("./bot/delivery.js", () => ({
deliverReplies,
}));
vi.mock("./sticker-cache.js", () => ({
cacheSticker: vi.fn(),
describeStickerImage: vi.fn(),
}));
import { dispatchTelegramMessage } from "./bot-message-dispatch.js";
describe("dispatchTelegramMessage draft streaming", () => {
beforeEach(() => {
createTelegramDraftStream.mockReset();
dispatchReplyWithBufferedBlockDispatcher.mockReset();
deliverReplies.mockReset();
});
it("streams drafts in private threads and forwards thread id", async () => {
const draftStream = {
update: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
stop: vi.fn(),
};
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hello" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
const resolveBotTopicsEnabled = vi.fn().mockResolvedValue(true);
const context = {
ctxPayload: {},
primaryCtx: { message: { chat: { id: 123, type: "private" } } },
msg: {
chat: { id: 123, type: "private" },
message_id: 456,
message_thread_id: 777,
},
chatId: 123,
isGroup: false,
resolvedThreadId: undefined,
replyThreadId: 777,
historyKey: undefined,
historyLimit: 0,
groupHistories: new Map(),
route: { agentId: "default", accountId: "default" },
skillFilter: undefined,
sendTyping: vi.fn(),
sendRecordVoice: vi.fn(),
ackReactionPromise: null,
reactionApi: null,
removeAckAfterReply: false,
};
await dispatchTelegramMessage({
context,
bot: { api: {} },
cfg: {},
runtime: {},
replyToMode: "first",
streamMode: "partial",
textLimit: 4096,
telegramCfg: {},
opts: {},
resolveBotTopicsEnabled,
});
expect(resolveBotTopicsEnabled).toHaveBeenCalledWith(context.primaryCtx);
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
chatId: 123,
messageThreadId: 777,
}),
);
expect(draftStream.update).toHaveBeenCalledWith("Hello");
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
messageThreadId: 777,
}),
);
});
});

View file

@ -1,10 +1,10 @@
// @ts-nocheck
import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js";
import {
findModelInCatalog,
loadModelCatalog,
modelSupportsVision,
} from "../agents/model-catalog.js";
import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js";
import { resolveDefaultModelForAgent } from "../agents/model-selection.js";
import { resolveChunkMode } from "../auto-reply/chunk.js";
import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js";
@ -55,7 +55,7 @@ export const dispatchTelegramMessage = async ({
msg,
chatId,
isGroup,
resolvedThreadId,
replyThreadId,
historyKey,
historyLimit,
groupHistories,
@ -69,11 +69,12 @@ export const dispatchTelegramMessage = async ({
} = context;
const isPrivateChat = msg.chat.type === "private";
const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id;
const draftMaxChars = Math.min(textLimit, 4096);
const canStreamDraft =
streamMode !== "off" &&
isPrivateChat &&
typeof resolvedThreadId === "number" &&
typeof messageThreadId === "number" &&
(await resolveBotTopicsEnabled(primaryCtx));
const draftStream = canStreamDraft
? createTelegramDraftStream({
@ -81,7 +82,7 @@ export const dispatchTelegramMessage = async ({
chatId,
draftId: msg.message_id || Date.now(),
maxChars: draftMaxChars,
messageThreadId: resolvedThreadId,
messageThreadId: replyThreadId,
log: logVerbose,
warn: logVerbose,
})
@ -240,7 +241,7 @@ export const dispatchTelegramMessage = async ({
bot,
replyToMode,
textLimit,
messageThreadId: resolvedThreadId,
messageThreadId: replyThreadId,
tableMode,
chunkMode,
onVoiceRecording: sendRecordVoice,
@ -273,15 +274,8 @@ export const dispatchTelegramMessage = async ({
},
replyOptions: {
skillFilter,
onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined,
onReasoningStream: draftStream
? (payload) => {
if (payload.text) {
draftStream.update(payload.text);
}
}
: undefined,
disableBlockStreaming,
onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined,
onModelSelected: (ctx) => {
prefixContext.onModelSelected(ctx);
},
@ -298,7 +292,7 @@ export const dispatchTelegramMessage = async ({
bot,
replyToMode,
textLimit,
messageThreadId: resolvedThreadId,
messageThreadId: replyThreadId,
tableMode,
chunkMode,
linkPreview: telegramCfg.linkPreview,

View file

@ -24,7 +24,6 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
import { formatUncaughtError } from "../infra/errors.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import type { RuntimeEnv } from "../runtime.js";
@ -44,6 +43,7 @@ import {
resolveTelegramUpdateId,
type TelegramUpdateKeyContext,
} from "./bot-updates.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import { resolveTelegramFetch } from "./fetch.js";
import { wasSentByBot } from "./sent-message-cache.js";
@ -245,7 +245,6 @@ export function createTelegramBot(opts: TelegramBotOptions) {
: undefined) ??
(opts.allowFrom && opts.allowFrom.length > 0 ? opts.allowFrom : undefined);
const replyToMode = opts.replyToMode ?? telegramCfg.replyToMode ?? "first";
const streamMode = resolveTelegramStreamMode(telegramCfg);
const nativeEnabled = resolveNativeCommandsEnabled({
providerId: "telegram",
providerSetting: telegramCfg.commands?.native,
@ -264,6 +263,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
const ackReactionScope = cfg.messages?.ackReactionScope ?? "group-mentions";
const mediaMaxBytes = (opts.mediaMaxMb ?? telegramCfg.mediaMaxMb ?? 5) * 1024 * 1024;
const logger = getChildLogger({ module: "telegram-auto-reply" });
const streamMode = resolveTelegramStreamMode(telegramCfg);
let botHasTopicsEnabled: boolean | undefined;
const resolveBotTopicsEnabled = async (ctx?: TelegramContext) => {
const fromCtx = ctx?.me as { has_topics_enabled?: boolean } | undefined;
@ -274,6 +274,10 @@ export function createTelegramBot(opts: TelegramBotOptions) {
if (typeof botHasTopicsEnabled === "boolean") {
return botHasTopicsEnabled;
}
if (typeof bot.api.getMe !== "function") {
botHasTopicsEnabled = false;
return botHasTopicsEnabled;
}
try {
const me = (await withTelegramApiErrorLogging({
operation: "getMe",

View file

@ -1,5 +1,4 @@
import { formatLocationText, type NormalizedLocation } from "../../channels/location.js";
import type { TelegramAccountConfig } from "../../config/types.telegram.js";
import type {
TelegramForwardChat,
TelegramForwardOrigin,
@ -61,9 +60,9 @@ export function buildTypingThreadParams(messageThreadId?: number) {
return { message_thread_id: Math.trunc(messageThreadId) };
}
export function resolveTelegramStreamMode(
telegramCfg: Pick<TelegramAccountConfig, "streamMode"> | undefined,
): TelegramStreamMode {
export function resolveTelegramStreamMode(telegramCfg?: {
streamMode?: TelegramStreamMode;
}): TelegramStreamMode {
const raw = telegramCfg?.streamMode?.trim().toLowerCase();
if (raw === "off" || raw === "partial" || raw === "block") {
return raw;

View file

@ -0,0 +1,35 @@
import { describe, expect, it, vi } from "vitest";
import { createTelegramDraftStream } from "./draft-stream.js";
describe("createTelegramDraftStream", () => {
it("passes message_thread_id when provided", () => {
const api = { sendMessageDraft: vi.fn().mockResolvedValue(true) };
const stream = createTelegramDraftStream({
api: api as any,
chatId: 123,
draftId: 42,
messageThreadId: 99,
});
stream.update("Hello");
expect(api.sendMessageDraft).toHaveBeenCalledWith(123, 42, "Hello", {
message_thread_id: 99,
});
});
it("omits message_thread_id for general topic id", () => {
const api = { sendMessageDraft: vi.fn().mockResolvedValue(true) };
const stream = createTelegramDraftStream({
api: api as any,
chatId: 123,
draftId: 42,
messageThreadId: 1,
});
stream.update("Hello");
expect(api.sendMessageDraft).toHaveBeenCalledWith(123, 42, "Hello", undefined);
});
});

View file

@ -1,4 +1,5 @@
import type { Bot } from "grammy";
import { buildTelegramThreadParams } from "./bot/helpers.js";
const TELEGRAM_DRAFT_MAX_CHARS = 4096;
const DEFAULT_THROTTLE_MS = 300;
@ -24,10 +25,7 @@ export function createTelegramDraftStream(params: {
const rawDraftId = Number.isFinite(params.draftId) ? Math.trunc(params.draftId) : 1;
const draftId = rawDraftId === 0 ? 1 : Math.abs(rawDraftId);
const chatId = params.chatId;
const threadParams =
typeof params.messageThreadId === "number"
? { message_thread_id: Math.trunc(params.messageThreadId) }
: undefined;
const threadParams = buildTelegramThreadParams(params.messageThreadId);
let lastSentText = "";
let lastSentAt = 0;