openclaw-vainplex/src/web/auto-reply.ts
Peter Steinberger c5677df56e Increase watchdog timeout to 30 minutes
Changed from 10 to 30 minutes to avoid false positives when
heartbeatMinutes is set to 10. The watchdog should be significantly
longer than the heartbeat interval to account for:
- Network latency
- Slow command responses
- Brief connection hiccups

With heartbeatMinutes=10, a 30-minute watchdog gives 3x buffer before
triggering auto-restart.
2025-11-30 18:03:19 +00:00

1028 lines
32 KiB
TypeScript

import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import { waitForever } from "../cli/wait.js";
import { loadConfig } from "../config/config.js";
import {
DEFAULT_IDLE_MINUTES,
deriveSessionKey,
loadSessionStore,
resolveStorePath,
saveSessionStore,
} from "../config/sessions.js";
import { danger, isVerbose, logVerbose, success } from "../globals.js";
import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { monitorWebInbox } from "./inbound.js";
import { loadWebMedia } from "./media.js";
import { sendMessageWeb } from "./outbound.js";
import {
computeBackoff,
newConnectionId,
type ReconnectPolicy,
resolveHeartbeatSeconds,
resolveReconnectPolicy,
sleepWithAbort,
} from "./reconnect.js";
import { getWebAuthAgeMs } from "./session.js";
const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024;
type WebInboundMsg = Parameters<
typeof monitorWebInbox
>[0]["onMessage"] extends (msg: infer M) => unknown
? M
: never;
export type WebMonitorTuning = {
reconnect?: Partial<ReconnectPolicy>;
heartbeatSeconds?: number;
replyHeartbeatMinutes?: number;
replyHeartbeatNow?: boolean;
sleep?: (ms: number, signal?: AbortSignal) => Promise<void>;
};
const formatDuration = (ms: number) =>
ms >= 1000 ? `${(ms / 1000).toFixed(2)}s` : `${ms}ms`;
const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30;
export const HEARTBEAT_TOKEN = "HEARTBEAT_OK";
export const HEARTBEAT_PROMPT = "HEARTBEAT ultrathink";
export function resolveReplyHeartbeatMinutes(
cfg: ReturnType<typeof loadConfig>,
overrideMinutes?: number,
) {
const raw = overrideMinutes ?? cfg.inbound?.reply?.heartbeatMinutes;
if (raw === 0) return null;
if (typeof raw === "number" && raw > 0) return raw;
return cfg.inbound?.reply?.mode === "command"
? DEFAULT_REPLY_HEARTBEAT_MINUTES
: null;
}
export function stripHeartbeatToken(raw?: string) {
if (!raw) return { shouldSkip: true, text: "" };
const trimmed = raw.trim();
if (!trimmed) return { shouldSkip: true, text: "" };
if (trimmed === HEARTBEAT_TOKEN) return { shouldSkip: true, text: "" };
const withoutToken = trimmed.replaceAll(HEARTBEAT_TOKEN, "").trim();
return {
shouldSkip: withoutToken.length === 0,
text: withoutToken || trimmed,
};
}
export async function runWebHeartbeatOnce(opts: {
cfg?: ReturnType<typeof loadConfig>;
to: string;
verbose?: boolean;
replyResolver?: typeof getReplyFromConfig;
runtime?: RuntimeEnv;
sender?: typeof sendMessageWeb;
sessionId?: string;
overrideBody?: string;
dryRun?: boolean;
}) {
const {
cfg: cfgOverride,
to,
verbose = false,
sessionId,
overrideBody,
dryRun = false,
} = opts;
const _runtime = opts.runtime ?? defaultRuntime;
const replyResolver = opts.replyResolver ?? getReplyFromConfig;
const sender = opts.sender ?? sendMessageWeb;
const runId = newConnectionId();
const heartbeatLogger = getChildLogger({
module: "web-heartbeat",
runId,
to,
});
const cfg = cfgOverride ?? loadConfig();
if (sessionId) {
const storePath = resolveStorePath(cfg.inbound?.reply?.session?.store);
const store = loadSessionStore(storePath);
store[to] = {
...(store[to] ?? {}),
sessionId,
updatedAt: Date.now(),
};
await saveSessionStore(storePath, store);
}
const sessionSnapshot = getSessionSnapshot(cfg, to, true);
if (verbose) {
heartbeatLogger.info(
{
to,
sessionKey: sessionSnapshot.key,
sessionId: sessionId ?? sessionSnapshot.entry?.sessionId ?? null,
sessionFresh: sessionSnapshot.fresh,
idleMinutes: sessionSnapshot.idleMinutes,
},
"heartbeat session snapshot",
);
}
if (overrideBody && overrideBody.trim().length === 0) {
throw new Error("Override body must be non-empty when provided.");
}
try {
if (overrideBody) {
if (dryRun) {
console.log(
success(
`[dry-run] web send -> ${to}: ${overrideBody.trim()} (manual message)`,
),
);
return;
}
const sendResult = await sender(to, overrideBody, { verbose });
heartbeatLogger.info(
{
to,
messageId: sendResult.messageId,
chars: overrideBody.length,
reason: "manual-message",
},
"manual heartbeat message sent",
);
console.log(
success(
`sent manual message to ${to} (web), id ${sendResult.messageId}`,
),
);
return;
}
const replyResult = await replyResolver(
{
Body: HEARTBEAT_PROMPT,
From: to,
To: to,
MessageSid: sessionId ?? sessionSnapshot.entry?.sessionId,
},
undefined,
cfg,
);
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
) {
heartbeatLogger.info(
{
to,
reason: "empty-reply",
sessionId: sessionSnapshot.entry?.sessionId ?? null,
},
"heartbeat skipped",
);
if (verbose) console.log(success("heartbeat: ok (empty reply)"));
return;
}
const hasMedia = Boolean(
replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0,
);
const stripped = stripHeartbeatToken(replyResult.text);
if (stripped.shouldSkip && !hasMedia) {
// Don't let heartbeats keep sessions alive: restore previous updatedAt so idle expiry still works.
const sessionCfg = cfg.inbound?.reply?.session;
const storePath = resolveStorePath(sessionCfg?.store);
const store = loadSessionStore(storePath);
if (sessionSnapshot.entry && store[sessionSnapshot.key]) {
store[sessionSnapshot.key].updatedAt = sessionSnapshot.entry.updatedAt;
await saveSessionStore(storePath, store);
}
heartbeatLogger.info(
{ to, reason: "heartbeat-token", rawLength: replyResult.text?.length },
"heartbeat skipped",
);
console.log(success("heartbeat: ok (HEARTBEAT_OK)"));
return;
}
if (hasMedia) {
heartbeatLogger.warn(
{ to },
"heartbeat reply contained media; sending text only",
);
}
const finalText = stripped.text || replyResult.text || "";
if (dryRun) {
heartbeatLogger.info(
{ to, reason: "dry-run", chars: finalText.length },
"heartbeat dry-run",
);
console.log(
success(`[dry-run] heartbeat -> ${to}: ${finalText.slice(0, 200)}`),
);
return;
}
const sendResult = await sender(to, finalText, { verbose });
heartbeatLogger.info(
{ to, messageId: sendResult.messageId, chars: finalText.length },
"heartbeat sent",
);
console.log(success(`heartbeat: alert sent to ${to}`));
} catch (err) {
heartbeatLogger.warn({ to, error: String(err) }, "heartbeat failed");
console.log(danger(`heartbeat: failed - ${String(err)}`));
throw err;
}
}
function getFallbackRecipient(cfg: ReturnType<typeof loadConfig>) {
const sessionCfg = cfg.inbound?.reply?.session;
const storePath = resolveStorePath(sessionCfg?.store);
const store = loadSessionStore(storePath);
const candidates = Object.entries(store).filter(([key]) => key !== "global");
if (candidates.length === 0) {
const allowFrom =
Array.isArray(cfg.inbound?.allowFrom) && cfg.inbound.allowFrom.length > 0
? cfg.inbound.allowFrom.filter((v) => v !== "*")
: [];
if (allowFrom.length === 0) return null;
return allowFrom[0] ? normalizeE164(allowFrom[0]) : null;
}
const mostRecent = candidates.sort(
(a, b) => (b[1]?.updatedAt ?? 0) - (a[1]?.updatedAt ?? 0),
)[0];
return mostRecent ? normalizeE164(mostRecent[0]) : null;
}
function getSessionRecipients(cfg: ReturnType<typeof loadConfig>) {
const sessionCfg = cfg.inbound?.reply?.session;
const scope = sessionCfg?.scope ?? "per-sender";
if (scope === "global") return [];
const storePath = resolveStorePath(sessionCfg?.store);
const store = loadSessionStore(storePath);
return Object.entries(store)
.filter(([key]) => key !== "global" && key !== "unknown")
.map(([key, entry]) => ({
to: normalizeE164(key),
updatedAt: entry?.updatedAt ?? 0,
}))
.filter(({ to }) => Boolean(to))
.sort((a, b) => b.updatedAt - a.updatedAt);
}
export function resolveHeartbeatRecipients(
cfg: ReturnType<typeof loadConfig>,
opts: { to?: string; all?: boolean } = {},
) {
if (opts.to) return { recipients: [normalizeE164(opts.to)], source: "flag" };
const sessionRecipients = getSessionRecipients(cfg);
const allowFrom =
Array.isArray(cfg.inbound?.allowFrom) && cfg.inbound.allowFrom.length > 0
? cfg.inbound.allowFrom.filter((v) => v !== "*").map(normalizeE164)
: [];
const unique = (list: string[]) => [...new Set(list.filter(Boolean))];
if (opts.all) {
const all = unique([...sessionRecipients.map((s) => s.to), ...allowFrom]);
return { recipients: all, source: "all" as const };
}
if (sessionRecipients.length === 1) {
return { recipients: [sessionRecipients[0].to], source: "session-single" };
}
if (sessionRecipients.length > 1) {
return {
recipients: sessionRecipients.map((s) => s.to),
source: "session-ambiguous" as const,
};
}
return { recipients: allowFrom, source: "allowFrom" as const };
}
function getSessionSnapshot(
cfg: ReturnType<typeof loadConfig>,
from: string,
isHeartbeat = false,
) {
const sessionCfg = cfg.inbound?.reply?.session;
const scope = sessionCfg?.scope ?? "per-sender";
const key = deriveSessionKey(scope, { From: from, To: "", Body: "" });
const store = loadSessionStore(resolveStorePath(sessionCfg?.store));
const entry = store[key];
const idleMinutes = Math.max(
(isHeartbeat
? (sessionCfg?.heartbeatIdleMinutes ?? sessionCfg?.idleMinutes)
: sessionCfg?.idleMinutes) ?? DEFAULT_IDLE_MINUTES,
1,
);
const fresh = !!(
entry && Date.now() - entry.updatedAt <= idleMinutes * 60_000
);
return { key, entry, fresh, idleMinutes };
}
async function deliverWebReply(params: {
replyResult: ReplyPayload;
msg: WebInboundMsg;
maxMediaBytes: number;
replyLogger: ReturnType<typeof getChildLogger>;
runtime: RuntimeEnv;
connectionId?: string;
skipLog?: boolean;
}) {
const {
replyResult,
msg,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
skipLog,
} = params;
const replyStarted = Date.now();
const mediaList = replyResult.mediaUrls?.length
? replyResult.mediaUrls
: replyResult.mediaUrl
? [replyResult.mediaUrl]
: [];
if (mediaList.length === 0 && replyResult.text) {
await msg.reply(replyResult.text || "");
if (!skipLog) {
logInfo(
`✅ Sent web reply to ${msg.from} (${(Date.now() - replyStarted).toFixed(0)}ms)`,
runtime,
);
}
replyLogger.info(
{
correlationId: msg.id ?? newConnectionId(),
connectionId: connectionId ?? null,
to: msg.from,
from: msg.to,
text: replyResult.text,
mediaUrl: null,
mediaSizeBytes: null,
mediaKind: null,
durationMs: Date.now() - replyStarted,
},
"auto-reply sent (text)",
);
return;
}
const cleanText = replyResult.text ?? undefined;
for (const [index, mediaUrl] of mediaList.entries()) {
try {
const media = await loadWebMedia(mediaUrl, maxMediaBytes);
if (isVerbose()) {
logVerbose(
`Web auto-reply media size: ${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB`,
);
logVerbose(
`Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`,
);
}
const caption = index === 0 ? cleanText || undefined : undefined;
if (media.kind === "image") {
await msg.sendMedia({
image: media.buffer,
caption,
mimetype: media.contentType,
});
} else if (media.kind === "audio") {
await msg.sendMedia({
audio: media.buffer,
ptt: true,
mimetype: media.contentType,
caption,
});
} else if (media.kind === "video") {
await msg.sendMedia({
video: media.buffer,
caption,
mimetype: media.contentType,
});
} else {
const fileName = mediaUrl.split("/").pop() ?? "file";
const mimetype = media.contentType ?? "application/octet-stream";
await msg.sendMedia({
document: media.buffer,
fileName,
caption,
mimetype,
});
}
logInfo(
`✅ Sent web media reply to ${msg.from} (${(media.buffer.length / (1024 * 1024)).toFixed(2)}MB)`,
runtime,
);
replyLogger.info(
{
correlationId: msg.id ?? newConnectionId(),
connectionId: connectionId ?? null,
to: msg.from,
from: msg.to,
text: index === 0 ? (cleanText ?? null) : null,
mediaUrl,
mediaSizeBytes: media.buffer.length,
mediaKind: media.kind,
durationMs: Date.now() - replyStarted,
},
"auto-reply sent (media)",
);
} catch (err) {
console.error(
danger(`Failed sending web media to ${msg.from}: ${String(err)}`),
);
if (index === 0 && cleanText) {
console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`);
await msg.reply(cleanText || "");
}
}
}
}
export async function monitorWebProvider(
verbose: boolean,
listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox,
keepAlive = true,
replyResolver: typeof getReplyFromConfig | undefined = getReplyFromConfig,
runtime: RuntimeEnv = defaultRuntime,
abortSignal?: AbortSignal,
tuning: WebMonitorTuning = {},
) {
const runId = newConnectionId();
const replyLogger = getChildLogger({ module: "web-auto-reply", runId });
const heartbeatLogger = getChildLogger({ module: "web-heartbeat", runId });
const reconnectLogger = getChildLogger({ module: "web-reconnect", runId });
const cfg = loadConfig();
const configuredMaxMb = cfg.inbound?.reply?.mediaMaxMb;
const maxMediaBytes =
typeof configuredMaxMb === "number" && configuredMaxMb > 0
? configuredMaxMb * 1024 * 1024
: DEFAULT_WEB_MEDIA_BYTES;
const heartbeatSeconds = resolveHeartbeatSeconds(
cfg,
tuning.heartbeatSeconds,
);
const replyHeartbeatMinutes = resolveReplyHeartbeatMinutes(
cfg,
tuning.replyHeartbeatMinutes,
);
const reconnectPolicy = resolveReconnectPolicy(cfg, tuning.reconnect);
const sleep =
tuning.sleep ??
((ms: number, signal?: AbortSignal) =>
sleepWithAbort(ms, signal ?? abortSignal));
const stopRequested = () => abortSignal?.aborted === true;
const abortPromise =
abortSignal &&
new Promise<"aborted">((resolve) =>
abortSignal.addEventListener("abort", () => resolve("aborted"), {
once: true,
}),
);
let sigintStop = false;
const handleSigint = () => {
sigintStop = true;
};
process.once("SIGINT", handleSigint);
let reconnectAttempts = 0;
// Track recently sent messages to prevent echo loops
const recentlySent = new Set<string>();
const MAX_RECENT_MESSAGES = 100;
while (true) {
if (stopRequested()) break;
const connectionId = newConnectionId();
const startedAt = Date.now();
let heartbeat: NodeJS.Timeout | null = null;
let replyHeartbeatTimer: NodeJS.Timeout | null = null;
let watchdogTimer: NodeJS.Timeout | null = null;
let lastMessageAt: number | null = null;
let handledMessages = 0;
let lastInboundMsg: WebInboundMsg | null = null;
// Watchdog to detect stuck message processing (e.g., event emitter died)
// Should be significantly longer than heartbeatMinutes to avoid false positives
const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages
const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
onMessage: async (msg) => {
handledMessages += 1;
lastMessageAt = Date.now();
const ts = msg.timestamp
? new Date(msg.timestamp).toISOString()
: new Date().toISOString();
const correlationId = msg.id ?? newConnectionId();
replyLogger.info(
{
connectionId,
correlationId,
from: msg.from,
to: msg.to,
body: msg.body,
mediaType: msg.mediaType ?? null,
mediaPath: msg.mediaPath ?? null,
},
"inbound web message",
);
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`);
// Detect same-phone mode (self-messaging)
const isSamePhoneMode = msg.from === msg.to;
if (isSamePhoneMode) {
logVerbose(`📱 Same-phone mode detected (from === to: ${msg.from})`);
}
// Skip if this is a message we just sent (echo detection)
if (recentlySent.has(msg.body)) {
console.log(`⏭️ Skipping echo: detected recently sent message`);
logVerbose(
`Skipping auto-reply: detected echo (message matches recently sent text)`,
);
recentlySent.delete(msg.body); // Remove from set to allow future identical messages
return;
}
logVerbose(
`Echo check: message not in recent set (size: ${recentlySent.size})`,
);
lastInboundMsg = msg;
// Build timestamp prefix (default: enabled with UTC)
// Can be: true (UTC), false (disabled), or "America/New_York" (custom timezone)
let timestampStr = "";
const tsCfg = cfg.inbound?.timestampPrefix;
const tsEnabled = tsCfg !== false; // default true
if (tsEnabled) {
const tz = typeof tsCfg === "string" ? tsCfg : "UTC";
const now = new Date();
try {
// Format: "Nov 29 06:30" - compact but informative
timestampStr = `[${now.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${now.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `;
} catch {
// Fallback to UTC if timezone invalid
timestampStr = `[${now.toISOString().slice(5, 16).replace("T", " ")}] `;
}
}
// Build message prefix: explicit config > default based on allowFrom
// If allowFrom is configured, user likely has a specific setup - no default prefix
// If no allowFrom, add "[warelay]" so AI knows it's coming through warelay
let messagePrefix = cfg.inbound?.messagePrefix;
if (messagePrefix === undefined) {
const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0;
messagePrefix = hasAllowFrom ? "" : "[warelay]";
}
const prefixStr = messagePrefix ? `${messagePrefix} ` : "";
const bodyForCommand = `${timestampStr}${prefixStr}${msg.body}`;
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: bodyForCommand,
From: msg.from,
To: msg.to,
MessageSid: msg.id,
MediaPath: msg.mediaPath,
MediaUrl: msg.mediaUrl,
MediaType: msg.mediaType,
},
{
onReplyStart: msg.sendComposing,
},
);
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
) {
logVerbose(
"Skipping auto-reply: no text/media returned from resolver",
);
return;
}
// Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match)
const responsePrefix = cfg.inbound?.responsePrefix;
if (responsePrefix && replyResult.text && replyResult.text.trim() !== HEARTBEAT_TOKEN) {
// Only add prefix if not already present
if (!replyResult.text.startsWith(responsePrefix)) {
replyResult.text = `${responsePrefix} ${replyResult.text}`;
}
}
try {
await deliverWebReply({
replyResult,
msg,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
// Track sent message to prevent echo loops
if (replyResult.text) {
recentlySent.add(replyResult.text);
logVerbose(
`Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`,
);
// Keep set bounded - remove oldest if too large
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
if (isVerbose()) {
console.log(
success(
`↩️ Auto-replied to ${msg.from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""})`,
),
);
} else {
console.log(
success(
`↩️ ${replyResult.text ?? "<media>"}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`,
),
);
}
} catch (err) {
console.error(
danger(
`Failed sending web auto-reply to ${msg.from}: ${String(err)}`,
),
);
}
},
});
const closeListener = async () => {
if (heartbeat) clearInterval(heartbeat);
if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer);
if (watchdogTimer) clearInterval(watchdogTimer);
try {
await listener.close();
} catch (err) {
logVerbose(`Socket close failed: ${String(err)}`);
}
};
if (keepAlive) {
heartbeat = setInterval(() => {
const authAgeMs = getWebAuthAgeMs();
const minutesSinceLastMessage = lastMessageAt
? Math.floor((Date.now() - lastMessageAt) / 60000)
: null;
const logData = {
connectionId,
reconnectAttempts,
messagesHandled: handledMessages,
lastMessageAt,
authAgeMs,
uptimeMs: Date.now() - startedAt,
...(minutesSinceLastMessage !== null && minutesSinceLastMessage > 30
? { minutesSinceLastMessage }
: {}),
};
// Warn if no messages in 30+ minutes
if (minutesSinceLastMessage && minutesSinceLastMessage > 30) {
heartbeatLogger.warn(logData, "⚠️ web relay heartbeat - no messages in 30+ minutes");
} else {
heartbeatLogger.info(logData, "web relay heartbeat");
}
}, heartbeatSeconds * 1000);
// Watchdog: Auto-restart if no messages received for MESSAGE_TIMEOUT_MS
watchdogTimer = setInterval(() => {
if (lastMessageAt) {
const timeSinceLastMessage = Date.now() - lastMessageAt;
if (timeSinceLastMessage > MESSAGE_TIMEOUT_MS) {
const minutesSinceLastMessage = Math.floor(timeSinceLastMessage / 60000);
heartbeatLogger.warn(
{
connectionId,
minutesSinceLastMessage,
lastMessageAt: new Date(lastMessageAt),
messagesHandled: handledMessages,
},
"Message timeout detected - forcing reconnect",
);
console.error(
`⚠️ No messages received in ${minutesSinceLastMessage}m - restarting connection`,
);
closeListener(); // Trigger reconnect
}
}
}, WATCHDOG_CHECK_MS);
}
const runReplyHeartbeat = async () => {
if (!replyHeartbeatMinutes) return;
const tickStart = Date.now();
if (!lastInboundMsg) {
const fallbackTo = getFallbackRecipient(cfg);
if (!fallbackTo) {
heartbeatLogger.info(
{
connectionId,
reason: "no-recent-inbound",
durationMs: Date.now() - tickStart,
},
"reply heartbeat skipped",
);
console.log(success("heartbeat: skipped (no recent inbound)"));
return;
}
const snapshot = getSessionSnapshot(cfg, fallbackTo, true);
if (!snapshot.entry) {
heartbeatLogger.info(
{ connectionId, to: fallbackTo, reason: "no-session-for-fallback" },
"reply heartbeat skipped",
);
console.log(success("heartbeat: skipped (no session to resume)"));
return;
}
if (isVerbose()) {
heartbeatLogger.info(
{
connectionId,
to: fallbackTo,
reason: "fallback-session",
sessionId: snapshot.entry?.sessionId ?? null,
sessionFresh: snapshot.fresh,
},
"reply heartbeat start",
);
}
await runWebHeartbeatOnce({
cfg,
to: fallbackTo,
verbose,
replyResolver,
runtime,
sessionId: snapshot.entry.sessionId,
});
heartbeatLogger.info(
{
connectionId,
to: fallbackTo,
...snapshot,
durationMs: Date.now() - tickStart,
},
"reply heartbeat sent (fallback session)",
);
return;
}
try {
const snapshot = getSessionSnapshot(cfg, lastInboundMsg.from);
if (isVerbose()) {
heartbeatLogger.info(
{
connectionId,
to: lastInboundMsg.from,
intervalMinutes: replyHeartbeatMinutes,
sessionKey: snapshot.key,
sessionId: snapshot.entry?.sessionId ?? null,
sessionFresh: snapshot.fresh,
},
"reply heartbeat start",
);
}
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: HEARTBEAT_PROMPT,
From: lastInboundMsg.from,
To: lastInboundMsg.to,
MessageSid: snapshot.entry?.sessionId,
MediaPath: undefined,
MediaUrl: undefined,
MediaType: undefined,
},
{
onReplyStart: lastInboundMsg.sendComposing,
},
);
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
) {
heartbeatLogger.info(
{
connectionId,
durationMs: Date.now() - tickStart,
reason: "empty-reply",
},
"reply heartbeat skipped",
);
console.log(success("heartbeat: ok (empty reply)"));
return;
}
const stripped = stripHeartbeatToken(replyResult.text);
const hasMedia = Boolean(
replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0,
);
if (stripped.shouldSkip && !hasMedia) {
heartbeatLogger.info(
{
connectionId,
durationMs: Date.now() - tickStart,
reason: "heartbeat-token",
rawLength: replyResult.text?.length ?? 0,
},
"reply heartbeat skipped",
);
console.log(success("heartbeat: ok (HEARTBEAT_OK)"));
return;
}
const cleanedReply: ReplyPayload = {
...replyResult,
text: stripped.text,
};
await deliverWebReply({
replyResult: cleanedReply,
msg: lastInboundMsg,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
const durationMs = Date.now() - tickStart;
const summary = `heartbeat: alert sent (${formatDuration(durationMs)})`;
console.log(summary);
heartbeatLogger.info(
{
connectionId,
durationMs,
hasMedia,
chars: stripped.text?.length ?? 0,
},
"reply heartbeat sent",
);
} catch (err) {
const durationMs = Date.now() - tickStart;
heartbeatLogger.warn(
{
connectionId,
error: String(err),
durationMs,
},
"reply heartbeat failed",
);
console.log(
danger(`heartbeat: failed (${formatDuration(durationMs)})`),
);
}
};
if (replyHeartbeatMinutes && !replyHeartbeatTimer) {
const intervalMs = replyHeartbeatMinutes * 60_000;
replyHeartbeatTimer = setInterval(() => {
void runReplyHeartbeat();
}, intervalMs);
if (tuning.replyHeartbeatNow) {
void runReplyHeartbeat();
}
}
logInfo(
"📡 Listening for personal WhatsApp Web inbound messages. Leave this running; Ctrl+C to stop.",
runtime,
);
if (!keepAlive) {
await closeListener();
return;
}
const reason = await Promise.race([
listener.onClose?.catch((err) => {
reconnectLogger.error(
{ error: String(err) },
"listener.onClose rejected",
);
return { status: 500, isLoggedOut: false, error: err };
}) ?? waitForever(),
abortPromise ?? waitForever(),
]);
const uptimeMs = Date.now() - startedAt;
if (uptimeMs > heartbeatSeconds * 1000) {
reconnectAttempts = 0; // Healthy stretch; reset the backoff.
}
if (stopRequested() || sigintStop || reason === "aborted") {
await closeListener();
break;
}
const status =
(typeof reason === "object" && reason && "status" in reason
? (reason as { status?: number }).status
: undefined) ?? "unknown";
const loggedOut =
typeof reason === "object" &&
reason &&
"isLoggedOut" in reason &&
(reason as { isLoggedOut?: boolean }).isLoggedOut;
reconnectLogger.info(
{
connectionId,
status,
loggedOut,
reconnectAttempts,
},
"web reconnect: connection closed",
);
if (loggedOut) {
runtime.error(
danger(
"WhatsApp session logged out. Run `warelay login --provider web` to relink.",
),
);
await closeListener();
break;
}
reconnectAttempts += 1;
if (
reconnectPolicy.maxAttempts > 0 &&
reconnectAttempts >= reconnectPolicy.maxAttempts
) {
reconnectLogger.warn(
{
connectionId,
status,
reconnectAttempts,
maxAttempts: reconnectPolicy.maxAttempts,
},
"web reconnect: max attempts reached",
);
runtime.error(
danger(
`WhatsApp Web connection closed (status ${status}). Reached max retries (${reconnectPolicy.maxAttempts}); exiting so you can relink.`,
),
);
await closeListener();
break;
}
const delay = computeBackoff(reconnectPolicy, reconnectAttempts);
reconnectLogger.info(
{
connectionId,
status,
reconnectAttempts,
maxAttempts: reconnectPolicy.maxAttempts || "unlimited",
delayMs: delay,
},
"web reconnect: scheduling retry",
);
runtime.error(
danger(
`WhatsApp Web connection closed (status ${status}). Retry ${reconnectAttempts}/${reconnectPolicy.maxAttempts || "∞"} in ${formatDuration(delay)}`,
),
);
await closeListener();
try {
await sleep(delay, abortSignal);
} catch {
break;
}
}
process.removeListener("SIGINT", handleSigint);
}
export { DEFAULT_WEB_MEDIA_BYTES };