fix: route decrypted E2EE events to message handler

Previously room.decrypted_event was only logged but not processed,
causing encrypted messages (including files) to be ignored by agents
with encryption enabled (like Mona).
This commit is contained in:
Claudia 2026-02-01 19:22:54 +01:00
parent 2c653096eb
commit 8e36620140
11 changed files with 88 additions and 21 deletions

View file

@ -45,6 +45,9 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
}, },
handleAction: async (ctx: ChannelMessageActionContext) => { handleAction: async (ctx: ChannelMessageActionContext) => {
const { action, params, cfg } = ctx; const { action, params, cfg } = ctx;
// Get accountId from context for multi-account support
const accountId = (ctx as { accountId?: string }).accountId ?? undefined;
const resolveRoomId = () => const resolveRoomId = () =>
readStringParam(params, "roomId") ?? readStringParam(params, "roomId") ??
readStringParam(params, "channelId") ?? readStringParam(params, "channelId") ??
@ -67,6 +70,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
mediaUrl: mediaUrl ?? undefined, mediaUrl: mediaUrl ?? undefined,
replyToId: replyTo ?? undefined, replyToId: replyTo ?? undefined,
threadId: threadId ?? undefined, threadId: threadId ?? undefined,
accountId,
}, },
cfg, cfg,
); );
@ -83,6 +87,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
messageId, messageId,
emoji, emoji,
remove, remove,
accountId,
}, },
cfg, cfg,
); );
@ -97,6 +102,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
roomId: resolveRoomId(), roomId: resolveRoomId(),
messageId, messageId,
limit, limit,
accountId,
}, },
cfg, cfg,
); );
@ -111,6 +117,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
limit, limit,
before: readStringParam(params, "before"), before: readStringParam(params, "before"),
after: readStringParam(params, "after"), after: readStringParam(params, "after"),
accountId,
}, },
cfg, cfg,
); );
@ -125,6 +132,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
roomId: resolveRoomId(), roomId: resolveRoomId(),
messageId, messageId,
content, content,
accountId,
}, },
cfg, cfg,
); );
@ -137,6 +145,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
action: "deleteMessage", action: "deleteMessage",
roomId: resolveRoomId(), roomId: resolveRoomId(),
messageId, messageId,
accountId,
}, },
cfg, cfg,
); );
@ -153,6 +162,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins", action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins",
roomId: resolveRoomId(), roomId: resolveRoomId(),
messageId, messageId,
accountId,
}, },
cfg, cfg,
); );
@ -165,6 +175,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
action: "memberInfo", action: "memberInfo",
userId, userId,
roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"), roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"),
accountId,
}, },
cfg, cfg,
); );
@ -175,6 +186,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
{ {
action: "channelInfo", action: "channelInfo",
roomId: resolveRoomId(), roomId: resolveRoomId(),
accountId,
}, },
cfg, cfg,
); );

View file

@ -20,18 +20,23 @@ export async function resolveActionClient(
): Promise<MatrixActionClient> { ): Promise<MatrixActionClient> {
ensureNodeRuntime(); ensureNodeRuntime();
if (opts.client) return { client: opts.client, stopOnDone: false }; if (opts.client) return { client: opts.client, stopOnDone: false };
const active = getActiveMatrixClient();
// Try to get the active client for the specified account
const active = getActiveMatrixClient(opts.accountId);
if (active) return { client: active, stopOnDone: false }; if (active) return { client: active, stopOnDone: false };
const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT); const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT);
if (shouldShareClient) { if (shouldShareClient) {
const client = await resolveSharedMatrixClient({ const client = await resolveSharedMatrixClient({
cfg: getMatrixRuntime().config.loadConfig() as CoreConfig, cfg: getMatrixRuntime().config.loadConfig() as CoreConfig,
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
}); });
return { client, stopOnDone: false }; return { client, stopOnDone: false };
} }
const auth = await resolveMatrixAuth({ const auth = await resolveMatrixAuth({
cfg: getMatrixRuntime().config.loadConfig() as CoreConfig, cfg: getMatrixRuntime().config.loadConfig() as CoreConfig,
accountId: opts.accountId ?? undefined,
}); });
const client = await createMatrixClient({ const client = await createMatrixClient({
homeserver: auth.homeserver, homeserver: auth.homeserver,
@ -39,6 +44,7 @@ export async function resolveActionClient(
accessToken: auth.accessToken, accessToken: auth.accessToken,
encryption: auth.encryption, encryption: auth.encryption,
localTimeoutMs: opts.timeoutMs, localTimeoutMs: opts.timeoutMs,
accountId: opts.accountId ?? undefined,
}); });
if (auth.encryption && client.crypto) { if (auth.encryption && client.crypto) {
try { try {

View file

@ -26,6 +26,7 @@ export async function sendMatrixMessage(
threadId: opts.threadId, threadId: opts.threadId,
client: opts.client, client: opts.client,
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
}); });
} }

View file

@ -57,6 +57,7 @@ export type MatrixRawEvent = {
export type MatrixActionClientOpts = { export type MatrixActionClientOpts = {
client?: MatrixClient; client?: MatrixClient;
timeoutMs?: number; timeoutMs?: number;
accountId?: string | null;
}; };
export type MatrixMessageSummary = { export type MatrixMessageSummary = {

View file

@ -1,11 +1,34 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
let activeClient: MatrixClient | null = null; const DEFAULT_ACCOUNT_KEY = "default";
export function setActiveMatrixClient(client: MatrixClient | null): void { // Multi-account: Map of accountId -> client
activeClient = client; const activeClients = new Map<string, MatrixClient>();
function normalizeAccountKey(accountId?: string | null): string {
return accountId?.trim().toLowerCase() || DEFAULT_ACCOUNT_KEY;
} }
export function getActiveMatrixClient(): MatrixClient | null { export function setActiveMatrixClient(client: MatrixClient | null, accountId?: string | null): void {
return activeClient; const key = normalizeAccountKey(accountId);
if (client) {
activeClients.set(key, client);
} else {
activeClients.delete(key);
}
}
export function getActiveMatrixClient(accountId?: string | null): MatrixClient | null {
const key = normalizeAccountKey(accountId);
const client = activeClients.get(key);
if (client) return client;
// Fallback: if specific account not found, try default
if (key !== DEFAULT_ACCOUNT_KEY) {
return activeClients.get(DEFAULT_ACCOUNT_KEY) ?? null;
}
return null;
}
export function listActiveMatrixClients(): Array<{ accountId: string; client: MatrixClient }> {
return Array.from(activeClients.entries()).map(([accountId, client]) => ({ accountId, client }));
} }

View file

@ -38,6 +38,8 @@ export function registerMatrixMonitorEvents(params: {
const eventId = event?.event_id ?? "unknown"; const eventId = event?.event_id ?? "unknown";
const eventType = event?.type ?? "unknown"; const eventType = event?.type ?? "unknown";
logVerboseMessage(`matrix: decrypted event room=${roomId} type=${eventType} id=${eventId}`); logVerboseMessage(`matrix: decrypted event room=${roomId} type=${eventType} id=${eventId}`);
// Process decrypted messages through the normal handler
onRoomMessage(roomId, event);
}); });
client.on( client.on(

View file

@ -328,6 +328,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
? content.file ? content.file
: undefined; : undefined;
const mediaUrl = contentUrl ?? contentFile?.url; const mediaUrl = contentUrl ?? contentFile?.url;
// DEBUG: Log media detection
const msgtype = "msgtype" in content ? content.msgtype : undefined;
logVerboseMessage(`matrix: content check msgtype=${msgtype} contentUrl=${contentUrl ?? "none"} mediaUrl=${mediaUrl ?? "none"} rawBody="${rawBody.slice(0,50)}"`);
if (!rawBody && !mediaUrl) { if (!rawBody && !mediaUrl) {
return; return;
} }
@ -340,6 +345,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const contentSize = const contentSize =
typeof contentInfo?.size === "number" ? contentInfo.size : undefined; typeof contentInfo?.size === "number" ? contentInfo.size : undefined;
if (mediaUrl?.startsWith("mxc://")) { if (mediaUrl?.startsWith("mxc://")) {
logVerboseMessage(`matrix: attempting media download url=${mediaUrl} size=${contentSize ?? "unknown"} maxBytes=${mediaMaxBytes}`);
try { try {
media = await downloadMatrixMedia({ media = await downloadMatrixMedia({
client, client,
@ -349,9 +355,12 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
maxBytes: mediaMaxBytes, maxBytes: mediaMaxBytes,
file: contentFile, file: contentFile,
}); });
logVerboseMessage(`matrix: media download success path=${media?.path ?? "none"}`);
} catch (err) { } catch (err) {
logVerboseMessage(`matrix: media download failed: ${String(err)}`); logVerboseMessage(`matrix: media download failed: ${String(err)}`);
} }
} else if (mediaUrl) {
logVerboseMessage(`matrix: skipping non-mxc media url=${mediaUrl}`);
} }
const bodyText = rawBody || media?.placeholder || ""; const bodyText = rawBody || media?.placeholder || "";
@ -544,7 +553,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}), }),
); );
if (shouldAckReaction() && messageId) { if (shouldAckReaction() && messageId) {
reactMatrixMessage(roomId, messageId, ackReaction, client).catch((err) => { reactMatrixMessage(roomId, messageId, ackReaction, { client }).catch((err) => {
logVerboseMessage(`matrix react failed for room ${roomId}: ${String(err)}`); logVerboseMessage(`matrix react failed for room ${roomId}: ${String(err)}`);
}); });
} }

View file

@ -178,7 +178,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
startClient: false, startClient: false,
accountId: opts.accountId, accountId: opts.accountId,
}); });
setActiveMatrixClient(client); setActiveMatrixClient(client, opts.accountId);
const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg); const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg);
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
@ -267,7 +267,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
logVerboseMessage(`matrix: stopping client for account ${opts.accountId ?? "default"}`); logVerboseMessage(`matrix: stopping client for account ${opts.accountId ?? "default"}`);
stopSharedClient(opts.accountId); stopSharedClient(opts.accountId);
} finally { } finally {
setActiveMatrixClient(null); setActiveMatrixClient(null, opts.accountId);
resolve(); resolve();
} }
}; };

View file

@ -46,6 +46,7 @@ export async function sendMessageMatrix(
const { client, stopOnDone } = await resolveMatrixClient({ const { client, stopOnDone } = await resolveMatrixClient({
client: opts.client, client: opts.client,
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
}); });
try { try {
const roomId = await resolveMatrixRoomId(client, to); const roomId = await resolveMatrixRoomId(client, to);
@ -229,13 +230,14 @@ export async function reactMatrixMessage(
roomId: string, roomId: string,
messageId: string, messageId: string,
emoji: string, emoji: string,
client?: MatrixClient, opts: { client?: MatrixClient; accountId?: string | null } = {},
): Promise<void> { ): Promise<void> {
if (!emoji.trim()) { if (!emoji.trim()) {
throw new Error("Matrix reaction requires an emoji"); throw new Error("Matrix reaction requires an emoji");
} }
const { client: resolved, stopOnDone } = await resolveMatrixClient({ const { client: resolved, stopOnDone } = await resolveMatrixClient({
client, client: opts.client,
accountId: opts.accountId,
}); });
try { try {
const resolvedRoom = await resolveMatrixRoomId(resolved, roomId); const resolvedRoom = await resolveMatrixRoomId(resolved, roomId);

View file

@ -29,25 +29,31 @@ export function resolveMediaMaxBytes(): number | undefined {
export async function resolveMatrixClient(opts: { export async function resolveMatrixClient(opts: {
client?: MatrixClient; client?: MatrixClient;
timeoutMs?: number; timeoutMs?: number;
accountId?: string | null;
}): Promise<{ client: MatrixClient; stopOnDone: boolean }> { }): Promise<{ client: MatrixClient; stopOnDone: boolean }> {
ensureNodeRuntime(); ensureNodeRuntime();
if (opts.client) return { client: opts.client, stopOnDone: false }; if (opts.client) return { client: opts.client, stopOnDone: false };
const active = getActiveMatrixClient();
// Try to get the active client for the specified account
const active = getActiveMatrixClient(opts.accountId);
if (active) return { client: active, stopOnDone: false }; if (active) return { client: active, stopOnDone: false };
const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT); const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT);
if (shouldShareClient) { if (shouldShareClient) {
const client = await resolveSharedMatrixClient({ const client = await resolveSharedMatrixClient({
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
}); });
return { client, stopOnDone: false }; return { client, stopOnDone: false };
} }
const auth = await resolveMatrixAuth(); const auth = await resolveMatrixAuth({ accountId: opts.accountId ?? undefined });
const client = await createMatrixClient({ const client = await createMatrixClient({
homeserver: auth.homeserver, homeserver: auth.homeserver,
userId: auth.userId, userId: auth.userId,
accessToken: auth.accessToken, accessToken: auth.accessToken,
encryption: auth.encryption, encryption: auth.encryption,
localTimeoutMs: opts.timeoutMs, localTimeoutMs: opts.timeoutMs,
accountId: opts.accountId ?? undefined,
}); });
if (auth.encryption && client.crypto) { if (auth.encryption && client.crypto) {
try { try {

View file

@ -39,6 +39,7 @@ export async function handleMatrixAction(
cfg: CoreConfig, cfg: CoreConfig,
): Promise<AgentToolResult<unknown>> { ): Promise<AgentToolResult<unknown>> {
const action = readStringParam(params, "action", { required: true }); const action = readStringParam(params, "action", { required: true });
const accountId = readStringParam(params, "accountId") ?? undefined;
const isActionEnabled = createActionGate(cfg.channels?.matrix?.actions); const isActionEnabled = createActionGate(cfg.channels?.matrix?.actions);
if (reactionActions.has(action)) { if (reactionActions.has(action)) {
@ -54,13 +55,14 @@ export async function handleMatrixAction(
if (remove || isEmpty) { if (remove || isEmpty) {
const result = await removeMatrixReactions(roomId, messageId, { const result = await removeMatrixReactions(roomId, messageId, {
emoji: remove ? emoji : undefined, emoji: remove ? emoji : undefined,
accountId,
}); });
return jsonResult({ ok: true, removed: result.removed }); return jsonResult({ ok: true, removed: result.removed });
} }
await reactMatrixMessage(roomId, messageId, emoji); await reactMatrixMessage(roomId, messageId, emoji, { accountId });
return jsonResult({ ok: true, added: emoji }); return jsonResult({ ok: true, added: emoji });
} }
const reactions = await listMatrixReactions(roomId, messageId); const reactions = await listMatrixReactions(roomId, messageId, { accountId });
return jsonResult({ ok: true, reactions }); return jsonResult({ ok: true, reactions });
} }
@ -82,6 +84,7 @@ export async function handleMatrixAction(
mediaUrl: mediaUrl ?? undefined, mediaUrl: mediaUrl ?? undefined,
replyToId: replyToId ?? undefined, replyToId: replyToId ?? undefined,
threadId: threadId ?? undefined, threadId: threadId ?? undefined,
accountId,
}); });
return jsonResult({ ok: true, result }); return jsonResult({ ok: true, result });
} }
@ -89,14 +92,14 @@ export async function handleMatrixAction(
const roomId = readRoomId(params); const roomId = readRoomId(params);
const messageId = readStringParam(params, "messageId", { required: true }); const messageId = readStringParam(params, "messageId", { required: true });
const content = readStringParam(params, "content", { required: true }); const content = readStringParam(params, "content", { required: true });
const result = await editMatrixMessage(roomId, messageId, content); const result = await editMatrixMessage(roomId, messageId, content, { accountId });
return jsonResult({ ok: true, result }); return jsonResult({ ok: true, result });
} }
case "deleteMessage": { case "deleteMessage": {
const roomId = readRoomId(params); const roomId = readRoomId(params);
const messageId = readStringParam(params, "messageId", { required: true }); const messageId = readStringParam(params, "messageId", { required: true });
const reason = readStringParam(params, "reason"); const reason = readStringParam(params, "reason");
await deleteMatrixMessage(roomId, messageId, { reason: reason ?? undefined }); await deleteMatrixMessage(roomId, messageId, { reason: reason ?? undefined, accountId });
return jsonResult({ ok: true, deleted: true }); return jsonResult({ ok: true, deleted: true });
} }
case "readMessages": { case "readMessages": {
@ -108,6 +111,7 @@ export async function handleMatrixAction(
limit: limit ?? undefined, limit: limit ?? undefined,
before: before ?? undefined, before: before ?? undefined,
after: after ?? undefined, after: after ?? undefined,
accountId,
}); });
return jsonResult({ ok: true, ...result }); return jsonResult({ ok: true, ...result });
} }
@ -123,15 +127,15 @@ export async function handleMatrixAction(
const roomId = readRoomId(params); const roomId = readRoomId(params);
if (action === "pinMessage") { if (action === "pinMessage") {
const messageId = readStringParam(params, "messageId", { required: true }); const messageId = readStringParam(params, "messageId", { required: true });
const result = await pinMatrixMessage(roomId, messageId); const result = await pinMatrixMessage(roomId, messageId, { accountId });
return jsonResult({ ok: true, pinned: result.pinned }); return jsonResult({ ok: true, pinned: result.pinned });
} }
if (action === "unpinMessage") { if (action === "unpinMessage") {
const messageId = readStringParam(params, "messageId", { required: true }); const messageId = readStringParam(params, "messageId", { required: true });
const result = await unpinMatrixMessage(roomId, messageId); const result = await unpinMatrixMessage(roomId, messageId, { accountId });
return jsonResult({ ok: true, pinned: result.pinned }); return jsonResult({ ok: true, pinned: result.pinned });
} }
const result = await listMatrixPins(roomId); const result = await listMatrixPins(roomId, { accountId });
return jsonResult({ ok: true, pinned: result.pinned, events: result.events }); return jsonResult({ ok: true, pinned: result.pinned, events: result.events });
} }
@ -143,6 +147,7 @@ export async function handleMatrixAction(
const roomId = readStringParam(params, "roomId") ?? readStringParam(params, "channelId"); const roomId = readStringParam(params, "roomId") ?? readStringParam(params, "channelId");
const result = await getMatrixMemberInfo(userId, { const result = await getMatrixMemberInfo(userId, {
roomId: roomId ?? undefined, roomId: roomId ?? undefined,
accountId,
}); });
return jsonResult({ ok: true, member: result }); return jsonResult({ ok: true, member: result });
} }
@ -152,7 +157,7 @@ export async function handleMatrixAction(
throw new Error("Matrix room info is disabled."); throw new Error("Matrix room info is disabled.");
} }
const roomId = readRoomId(params); const roomId = readRoomId(params);
const result = await getMatrixRoomInfo(roomId); const result = await getMatrixRoomInfo(roomId, { accountId });
return jsonResult({ ok: true, room: result }); return jsonResult({ ok: true, room: result });
} }