diff --git a/extensions/matrix/src/actions.ts b/extensions/matrix/src/actions.ts index a7c219536..ce1059233 100644 --- a/extensions/matrix/src/actions.ts +++ b/extensions/matrix/src/actions.ts @@ -7,16 +7,14 @@ import { type ChannelMessageActionName, type ChannelToolSend, } from "openclaw/plugin-sdk"; -import type { CoreConfig } from "./types.js"; import { resolveMatrixAccount } from "./matrix/accounts.js"; import { handleMatrixAction } from "./tool-actions.js"; +import type { CoreConfig } from "./types.js"; export const matrixMessageActions: ChannelMessageActionAdapter = { listActions: ({ cfg }) => { const account = resolveMatrixAccount({ cfg: cfg as CoreConfig }); - if (!account.enabled || !account.configured) { - return []; - } + if (!account.enabled || !account.configured) return []; const gate = createActionGate((cfg as CoreConfig).channels?.matrix?.actions); const actions = new Set(["send", "poll"]); if (gate("reactions")) { @@ -33,28 +31,23 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { actions.add("unpin"); actions.add("list-pins"); } - if (gate("memberInfo")) { - actions.add("member-info"); - } - if (gate("channelInfo")) { - actions.add("channel-info"); - } + if (gate("memberInfo")) actions.add("member-info"); + if (gate("channelInfo")) actions.add("channel-info"); return Array.from(actions); }, supportsAction: ({ action }) => action !== "poll", extractToolSend: ({ args }): ChannelToolSend | null => { const action = typeof args.action === "string" ? args.action.trim() : ""; - if (action !== "sendMessage") { - return null; - } + if (action !== "sendMessage") return null; const to = typeof args.to === "string" ? args.to : undefined; - if (!to) { - return null; - } + if (!to) return null; return { to }; }, handleAction: async (ctx: ChannelMessageActionContext) => { const { action, params, cfg } = ctx; + // Get accountId from context for multi-account support + const accountId = (ctx as { accountId?: string }).accountId ?? undefined; + const resolveRoomId = () => readStringParam(params, "roomId") ?? readStringParam(params, "channelId") ?? @@ -77,6 +70,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { mediaUrl: mediaUrl ?? undefined, replyToId: replyTo ?? undefined, threadId: threadId ?? undefined, + accountId, }, cfg, ); @@ -93,6 +87,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { messageId, emoji, remove, + accountId, }, cfg, ); @@ -107,6 +102,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { roomId: resolveRoomId(), messageId, limit, + accountId, }, cfg, ); @@ -121,6 +117,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { limit, before: readStringParam(params, "before"), after: readStringParam(params, "after"), + accountId, }, cfg, ); @@ -135,6 +132,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { roomId: resolveRoomId(), messageId, content, + accountId, }, cfg, ); @@ -147,6 +145,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { action: "deleteMessage", roomId: resolveRoomId(), messageId, + accountId, }, cfg, ); @@ -163,6 +162,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins", roomId: resolveRoomId(), messageId, + accountId, }, cfg, ); @@ -175,6 +175,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { action: "memberInfo", userId, roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"), + accountId, }, cfg, ); @@ -185,6 +186,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = { { action: "channelInfo", roomId: resolveRoomId(), + accountId, }, cfg, ); diff --git a/extensions/matrix/src/channel.directory.test.ts b/extensions/matrix/src/channel.directory.test.ts index eb2aeacac..86b547833 100644 --- a/extensions/matrix/src/channel.directory.test.ts +++ b/extensions/matrix/src/channel.directory.test.ts @@ -1,6 +1,8 @@ -import type { PluginRuntime } from "openclaw/plugin-sdk"; import { beforeEach, describe, expect, it } from "vitest"; + +import type { PluginRuntime } from "openclaw/plugin-sdk"; import type { CoreConfig } from "./types.js"; + import { matrixPlugin } from "./channel.js"; import { setMatrixRuntime } from "./runtime.js"; @@ -32,12 +34,7 @@ describe("matrix directory", () => { expect(matrixPlugin.directory?.listGroups).toBeTruthy(); await expect( - matrixPlugin.directory!.listPeers({ - cfg, - accountId: undefined, - query: undefined, - limit: undefined, - }), + matrixPlugin.directory!.listPeers({ cfg, accountId: undefined, query: undefined, limit: undefined }), ).resolves.toEqual( expect.arrayContaining([ { kind: "user", id: "user:@alice:example.org" }, @@ -48,12 +45,7 @@ describe("matrix directory", () => { ); await expect( - matrixPlugin.directory!.listGroups({ - cfg, - accountId: undefined, - query: undefined, - limit: undefined, - }), + matrixPlugin.directory!.listGroups({ cfg, accountId: undefined, query: undefined, limit: undefined }), ).resolves.toEqual( expect.arrayContaining([ { kind: "group", id: "room:!room1:example.org" }, diff --git a/extensions/matrix/src/channel.ts b/extensions/matrix/src/channel.ts index eb67c49ce..6b7ac5029 100644 --- a/extensions/matrix/src/channel.ts +++ b/extensions/matrix/src/channel.ts @@ -9,14 +9,11 @@ import { setAccountEnabledInConfigSection, type ChannelPlugin, } from "openclaw/plugin-sdk"; -import type { CoreConfig } from "./types.js"; + import { matrixMessageActions } from "./actions.js"; import { MatrixConfigSchema } from "./config-schema.js"; -import { listMatrixDirectoryGroupsLive, listMatrixDirectoryPeersLive } from "./directory-live.js"; -import { - resolveMatrixGroupRequireMention, - resolveMatrixGroupToolPolicy, -} from "./group-mentions.js"; +import { resolveMatrixGroupRequireMention, resolveMatrixGroupToolPolicy } from "./group-mentions.js"; +import type { CoreConfig } from "./types.js"; import { listMatrixAccountIds, resolveDefaultMatrixAccountId, @@ -24,12 +21,17 @@ import { type ResolvedMatrixAccount, } from "./matrix/accounts.js"; import { resolveMatrixAuth } from "./matrix/client.js"; +import { importMatrixIndex } from "./matrix/import-mutex.js"; import { normalizeAllowListLower } from "./matrix/monitor/allowlist.js"; import { probeMatrix } from "./matrix/probe.js"; import { sendMessageMatrix } from "./matrix/send.js"; import { matrixOnboardingAdapter } from "./onboarding.js"; import { matrixOutbound } from "./outbound.js"; import { resolveMatrixTargets } from "./resolve-targets.js"; +import { + listMatrixDirectoryGroupsLive, + listMatrixDirectoryPeersLive, +} from "./directory-live.js"; const meta = { id: "matrix", @@ -44,9 +46,7 @@ const meta = { function normalizeMatrixMessagingTarget(raw: string): string | undefined { let normalized = raw.trim(); - if (!normalized) { - return undefined; - } + if (!normalized) return undefined; const lowered = normalized.toLowerCase(); if (lowered.startsWith("matrix:")) { normalized = normalized.slice("matrix:".length).trim(); @@ -109,7 +109,8 @@ export const matrixPlugin: ChannelPlugin = { configSchema: buildChannelConfigSchema(MatrixConfigSchema), config: { listAccountIds: (cfg) => listMatrixAccountIds(cfg as CoreConfig), - resolveAccount: (cfg, accountId) => resolveMatrixAccount({ cfg: cfg as CoreConfig, accountId }), + resolveAccount: (cfg, accountId) => + resolveMatrixAccount({ cfg: cfg as CoreConfig, accountId }), defaultAccountId: (cfg) => resolveDefaultMatrixAccountId(cfg as CoreConfig), setAccountEnabled: ({ cfg, accountId, enabled }) => setAccountEnabledInConfigSection({ @@ -153,20 +154,15 @@ export const matrixPlugin: ChannelPlugin = { policyPath: "channels.matrix.dm.policy", allowFromPath: "channels.matrix.dm.allowFrom", approveHint: formatPairingApproveHint("matrix"), - normalizeEntry: (raw) => - raw - .replace(/^matrix:/i, "") - .trim() - .toLowerCase(), + normalizeEntry: (raw) => raw.replace(/^matrix:/i, "").trim().toLowerCase(), }), collectWarnings: ({ account, cfg }) => { const defaultGroupPolicy = (cfg as CoreConfig).channels?.defaults?.groupPolicy; - const groupPolicy = account.config.groupPolicy ?? defaultGroupPolicy ?? "allowlist"; - if (groupPolicy !== "open") { - return []; - } + const groupPolicy = + account.config.groupPolicy ?? defaultGroupPolicy ?? "allowlist"; + if (groupPolicy !== "open") return []; return [ - '- Matrix rooms: groupPolicy="open" allows any room to trigger (mention-gated). Set channels.matrix.groupPolicy="allowlist" + channels.matrix.groups (and optionally channels.matrix.groupAllowFrom) to restrict rooms.', + "- Matrix rooms: groupPolicy=\"open\" allows any room to trigger (mention-gated). Set channels.matrix.groupPolicy=\"allowlist\" + channels.matrix.groups (and optionally channels.matrix.groupAllowFrom) to restrict rooms.", ]; }, }, @@ -175,13 +171,16 @@ export const matrixPlugin: ChannelPlugin = { resolveToolPolicy: resolveMatrixGroupToolPolicy, }, threading: { - resolveReplyToMode: ({ cfg }) => (cfg as CoreConfig).channels?.matrix?.replyToMode ?? "off", + resolveReplyToMode: ({ cfg }) => + (cfg as CoreConfig).channels?.matrix?.replyToMode ?? "off", buildToolContext: ({ context, hasRepliedRef }) => { const currentTarget = context.To; return { currentChannelId: currentTarget?.trim() || undefined, currentThreadTs: - context.MessageThreadId != null ? String(context.MessageThreadId) : context.ReplyToId, + context.MessageThreadId != null + ? String(context.MessageThreadId) + : context.ReplyToId, hasRepliedRef, }; }, @@ -191,12 +190,8 @@ export const matrixPlugin: ChannelPlugin = { targetResolver: { looksLikeId: (raw) => { const trimmed = raw.trim(); - if (!trimmed) { - return false; - } - if (/^(matrix:)?[!#@]/i.test(trimmed)) { - return true; - } + if (!trimmed) return false; + if (/^(matrix:)?[!#@]/i.test(trimmed)) return true; return trimmed.includes(":"); }, hint: "", @@ -211,17 +206,13 @@ export const matrixPlugin: ChannelPlugin = { for (const entry of account.config.dm?.allowFrom ?? []) { const raw = String(entry).trim(); - if (!raw || raw === "*") { - continue; - } + if (!raw || raw === "*") continue; ids.add(raw.replace(/^matrix:/i, "")); } for (const entry of account.config.groupAllowFrom ?? []) { const raw = String(entry).trim(); - if (!raw || raw === "*") { - continue; - } + if (!raw || raw === "*") continue; ids.add(raw.replace(/^matrix:/i, "")); } @@ -229,9 +220,7 @@ export const matrixPlugin: ChannelPlugin = { for (const room of Object.values(groups)) { for (const entry of room.users ?? []) { const raw = String(entry).trim(); - if (!raw || raw === "*") { - continue; - } + if (!raw || raw === "*") continue; ids.add(raw.replace(/^matrix:/i, "")); } } @@ -242,9 +231,7 @@ export const matrixPlugin: ChannelPlugin = { .map((raw) => { const lowered = raw.toLowerCase(); const cleaned = lowered.startsWith("user:") ? raw.slice("user:".length).trim() : raw; - if (cleaned.startsWith("@")) { - return `user:${cleaned}`; - } + if (cleaned.startsWith("@")) return `user:${cleaned}`; return cleaned; }) .filter((id) => (q ? id.toLowerCase().includes(q) : true)) @@ -269,12 +256,8 @@ export const matrixPlugin: ChannelPlugin = { .map((raw) => raw.replace(/^matrix:/i, "")) .map((raw) => { const lowered = raw.toLowerCase(); - if (lowered.startsWith("room:") || lowered.startsWith("channel:")) { - return raw; - } - if (raw.startsWith("!")) { - return `room:${raw}`; - } + if (lowered.startsWith("room:") || lowered.startsWith("channel:")) return raw; + if (raw.startsWith("!")) return `room:${raw}`; return raw; }) .filter((id) => (q ? id.toLowerCase().includes(q) : true)) @@ -302,12 +285,8 @@ export const matrixPlugin: ChannelPlugin = { name, }), validateInput: ({ input }) => { - if (input.useEnv) { - return null; - } - if (!input.homeserver?.trim()) { - return "Matrix requires --homeserver"; - } + if (input.useEnv) return null; + if (!input.homeserver?.trim()) return "Matrix requires --homeserver"; const accessToken = input.accessToken?.trim(); const password = input.password?.trim(); const userId = input.userId?.trim(); @@ -315,12 +294,8 @@ export const matrixPlugin: ChannelPlugin = { return "Matrix requires --access-token or --password"; } if (!accessToken) { - if (!userId) { - return "Matrix requires --user-id when using --password"; - } - if (!password) { - return "Matrix requires --password when using --user-id"; - } + if (!userId) return "Matrix requires --user-id when using --password"; + if (!password) return "Matrix requires --password when using --user-id"; } return null; }, @@ -365,9 +340,7 @@ export const matrixPlugin: ChannelPlugin = { collectStatusIssues: (accounts) => accounts.flatMap((account) => { const lastError = typeof account.lastError === "string" ? account.lastError.trim() : ""; - if (!lastError) { - return []; - } + if (!lastError) return []; return [ { channel: "matrix", @@ -387,7 +360,7 @@ export const matrixPlugin: ChannelPlugin = { probe: snapshot.probe, lastProbeAt: snapshot.lastProbeAt ?? null, }), - probeAccount: async ({ timeoutMs, cfg }) => { + probeAccount: async ({ account, timeoutMs, cfg }) => { try { const auth = await resolveMatrixAuth({ cfg: cfg as CoreConfig }); return await probeMatrix({ @@ -427,9 +400,12 @@ export const matrixPlugin: ChannelPlugin = { accountId: account.accountId, baseUrl: account.homeserver, }); - ctx.log?.info(`[${account.accountId}] starting provider (${account.homeserver ?? "matrix"})`); + ctx.log?.info( + `[${account.accountId}] starting provider (${account.homeserver ?? "matrix"})`, + ); // Lazy import: the monitor pulls the reply pipeline; avoid ESM init cycles. - const { monitorMatrixProvider } = await import("./matrix/index.js"); + // Use serialized import to prevent race conditions during parallel account startup. + const { monitorMatrixProvider } = await importMatrixIndex(); return monitorMatrixProvider({ runtime: ctx.runtime, abortSignal: ctx.abortSignal, diff --git a/extensions/matrix/src/directory-live.ts b/extensions/matrix/src/directory-live.ts index e43a7c099..67fc5e563 100644 --- a/extensions/matrix/src/directory-live.ts +++ b/extensions/matrix/src/directory-live.ts @@ -1,4 +1,5 @@ import type { ChannelDirectoryEntry } from "openclaw/plugin-sdk"; + import { resolveMatrixAuth } from "./matrix/client.js"; type MatrixUserResult = { @@ -54,9 +55,7 @@ export async function listMatrixDirectoryPeersLive(params: { limit?: number | null; }): Promise { const query = normalizeQuery(params.query); - if (!query) { - return []; - } + if (!query) return []; const auth = await resolveMatrixAuth({ cfg: params.cfg as never }); const res = await fetchMatrixJson({ homeserver: auth.homeserver, @@ -72,9 +71,7 @@ export async function listMatrixDirectoryPeersLive(params: { return results .map((entry) => { const userId = entry.user_id?.trim(); - if (!userId) { - return null; - } + if (!userId) return null; return { kind: "user", id: userId, @@ -126,17 +123,13 @@ export async function listMatrixDirectoryGroupsLive(params: { limit?: number | null; }): Promise { const query = normalizeQuery(params.query); - if (!query) { - return []; - } + if (!query) return []; const auth = await resolveMatrixAuth({ cfg: params.cfg as never }); const limit = typeof params.limit === "number" && params.limit > 0 ? params.limit : 20; if (query.startsWith("#")) { const roomId = await resolveMatrixRoomAlias(auth.homeserver, auth.accessToken, query); - if (!roomId) { - return []; - } + if (!roomId) return []; return [ { kind: "group", @@ -167,21 +160,15 @@ export async function listMatrixDirectoryGroupsLive(params: { for (const roomId of rooms) { const name = await fetchMatrixRoomName(auth.homeserver, auth.accessToken, roomId); - if (!name) { - continue; - } - if (!name.toLowerCase().includes(query)) { - continue; - } + if (!name) continue; + if (!name.toLowerCase().includes(query)) continue; results.push({ kind: "group", id: roomId, name, handle: `#${name}`, }); - if (results.length >= limit) { - break; - } + if (results.length >= limit) break; } return results; diff --git a/extensions/matrix/src/group-mentions.ts b/extensions/matrix/src/group-mentions.ts index d5b970021..05d63ce2d 100644 --- a/extensions/matrix/src/group-mentions.ts +++ b/extensions/matrix/src/group-mentions.ts @@ -1,6 +1,7 @@ import type { ChannelGroupContext, GroupToolPolicyConfig } from "openclaw/plugin-sdk"; -import type { CoreConfig } from "./types.js"; + import { resolveMatrixRoomConfig } from "./matrix/monitor/rooms.js"; +import type { CoreConfig } from "./types.js"; export function resolveMatrixGroupRequireMention(params: ChannelGroupContext): boolean { const rawGroupId = params.groupId?.trim() ?? ""; @@ -25,15 +26,9 @@ export function resolveMatrixGroupRequireMention(params: ChannelGroupContext): b name: groupChannel || undefined, }).config; if (resolved) { - if (resolved.autoReply === true) { - return false; - } - if (resolved.autoReply === false) { - return true; - } - if (typeof resolved.requireMention === "boolean") { - return resolved.requireMention; - } + if (resolved.autoReply === true) return false; + if (resolved.autoReply === false) return true; + if (typeof resolved.requireMention === "boolean") return resolved.requireMention; } return true; } diff --git a/extensions/matrix/src/matrix/accounts.test.ts b/extensions/matrix/src/matrix/accounts.test.ts index d45368475..2f1cfdb10 100644 --- a/extensions/matrix/src/matrix/accounts.test.ts +++ b/extensions/matrix/src/matrix/accounts.test.ts @@ -1,4 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + import type { CoreConfig } from "../types.js"; import { resolveMatrixAccount } from "./accounts.js"; diff --git a/extensions/matrix/src/matrix/accounts.ts b/extensions/matrix/src/matrix/accounts.ts index 99593b8a3..f761732af 100644 --- a/extensions/matrix/src/matrix/accounts.ts +++ b/extensions/matrix/src/matrix/accounts.ts @@ -1,5 +1,5 @@ import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "openclaw/plugin-sdk"; -import type { CoreConfig, MatrixConfig } from "../types.js"; +import type { CoreConfig, MatrixAccountConfig, MatrixConfig } from "../types.js"; import { resolveMatrixConfig } from "./client.js"; import { credentialsMatchConfig, loadMatrixCredentials } from "./credentials.js"; @@ -10,56 +10,155 @@ export type ResolvedMatrixAccount = { configured: boolean; homeserver?: string; userId?: string; - config: MatrixConfig; + accessToken?: string; + config: MatrixAccountConfig; }; -export function listMatrixAccountIds(_cfg: CoreConfig): string[] { - return [DEFAULT_ACCOUNT_ID]; +/** + * List account IDs explicitly configured in channels.matrix.accounts + */ +function listConfiguredAccountIds(cfg: CoreConfig): string[] { + const accounts = cfg.channels?.matrix?.accounts; + if (!accounts || typeof accounts !== "object") { + return []; + } + const ids = new Set(); + for (const key of Object.keys(accounts)) { + if (!key) continue; + ids.add(normalizeAccountId(key)); + } + return [...ids]; +} + +/** + * List account IDs referenced in bindings for matrix channel + */ +function listBoundAccountIds(cfg: CoreConfig): string[] { + const bindings = cfg.bindings; + if (!Array.isArray(bindings)) return []; + const ids = new Set(); + for (const binding of bindings) { + if (binding.match?.channel === "matrix" && binding.match?.accountId) { + ids.add(normalizeAccountId(binding.match.accountId)); + } + } + return [...ids]; +} + +/** + * List all Matrix account IDs (configured + bound) + */ +export function listMatrixAccountIds(cfg: CoreConfig): string[] { + const ids = Array.from( + new Set([ + DEFAULT_ACCOUNT_ID, + ...listConfiguredAccountIds(cfg), + ...listBoundAccountIds(cfg), + ]), + ); + return ids.toSorted((a, b) => a.localeCompare(b)); } export function resolveDefaultMatrixAccountId(cfg: CoreConfig): string { const ids = listMatrixAccountIds(cfg); - if (ids.includes(DEFAULT_ACCOUNT_ID)) { - return DEFAULT_ACCOUNT_ID; - } + if (ids.includes(DEFAULT_ACCOUNT_ID)) return DEFAULT_ACCOUNT_ID; return ids[0] ?? DEFAULT_ACCOUNT_ID; } +/** + * Get account-specific config from channels.matrix.accounts[accountId] + */ +function resolveAccountConfig( + cfg: CoreConfig, + accountId: string, +): MatrixAccountConfig | undefined { + const accounts = cfg.channels?.matrix?.accounts; + if (!accounts || typeof accounts !== "object") { + return undefined; + } + const direct = accounts[accountId] as MatrixAccountConfig | undefined; + if (direct) return direct; + + const normalized = normalizeAccountId(accountId); + const matchKey = Object.keys(accounts).find( + (key) => normalizeAccountId(key) === normalized + ); + return matchKey ? (accounts[matchKey] as MatrixAccountConfig | undefined) : undefined; +} + +/** + * Merge base matrix config with account-specific overrides + */ +function mergeMatrixAccountConfig(cfg: CoreConfig, accountId: string): MatrixAccountConfig { + const base = cfg.channels?.matrix ?? {}; + // Extract base config without 'accounts' key + const { accounts: _ignored, ...baseConfig } = base as MatrixConfig; + const accountConfig = resolveAccountConfig(cfg, accountId) ?? {}; + + // Account config overrides base config + return { ...baseConfig, ...accountConfig }; +} + export function resolveMatrixAccount(params: { cfg: CoreConfig; accountId?: string | null; }): ResolvedMatrixAccount { const accountId = normalizeAccountId(params.accountId); - const base = params.cfg.channels?.matrix ?? {}; - const enabled = base.enabled !== false; - const resolved = resolveMatrixConfig(params.cfg, process.env); - const hasHomeserver = Boolean(resolved.homeserver); - const hasUserId = Boolean(resolved.userId); - const hasAccessToken = Boolean(resolved.accessToken); - const hasPassword = Boolean(resolved.password); + const merged = mergeMatrixAccountConfig(params.cfg, accountId); + + // Check if this is a non-default account - use account-specific auth + const isDefaultAccount = accountId === DEFAULT_ACCOUNT_ID || accountId === "default"; + + // For non-default accounts, use account-specific credentials + // For default account, use base config or env + let homeserver = merged.homeserver; + let userId = merged.userId; + let accessToken = merged.accessToken; + + if (isDefaultAccount) { + // Default account can fall back to env vars + const resolved = resolveMatrixConfig(params.cfg, process.env); + homeserver = homeserver || resolved.homeserver; + userId = userId || resolved.userId; + accessToken = accessToken || resolved.accessToken; + } + + const baseEnabled = params.cfg.channels?.matrix?.enabled !== false; + const accountEnabled = merged.enabled !== false; + const enabled = baseEnabled && accountEnabled; + + const hasHomeserver = Boolean(homeserver); + const hasAccessToken = Boolean(accessToken); + const hasPassword = Boolean(merged.password); + const hasUserId = Boolean(userId); const hasPasswordAuth = hasUserId && hasPassword; - const stored = loadMatrixCredentials(process.env); + + // Check for stored credentials (only for default account) + const stored = isDefaultAccount ? loadMatrixCredentials(process.env) : null; const hasStored = - stored && resolved.homeserver + stored && homeserver ? credentialsMatchConfig(stored, { - homeserver: resolved.homeserver, - userId: resolved.userId || "", + homeserver: homeserver, + userId: userId || "", }) : false; + const configured = hasHomeserver && (hasAccessToken || hasPasswordAuth || Boolean(hasStored)); + return { accountId, enabled, - name: base.name?.trim() || undefined, + name: merged.name?.trim() || undefined, configured, - homeserver: resolved.homeserver || undefined, - userId: resolved.userId || undefined, - config: base, + homeserver: homeserver || undefined, + userId: userId || undefined, + accessToken: accessToken || undefined, + config: merged, }; } export function listEnabledMatrixAccounts(cfg: CoreConfig): ResolvedMatrixAccount[] { return listMatrixAccountIds(cfg) .map((accountId) => resolveMatrixAccount({ cfg, accountId })) - .filter((account) => account.enabled); + .filter((account) => account.enabled && account.configured); } diff --git a/extensions/matrix/src/matrix/actions/client.ts b/extensions/matrix/src/matrix/actions/client.ts index d9fe477db..88705fd04 100644 --- a/extensions/matrix/src/matrix/actions/client.ts +++ b/extensions/matrix/src/matrix/actions/client.ts @@ -1,6 +1,5 @@ -import type { CoreConfig } from "../types.js"; -import type { MatrixActionClient, MatrixActionClientOpts } from "./types.js"; import { getMatrixRuntime } from "../../runtime.js"; +import type { CoreConfig } from "../types.js"; import { getActiveMatrixClient } from "../active-client.js"; import { createMatrixClient, @@ -8,6 +7,7 @@ import { resolveMatrixAuth, resolveSharedMatrixClient, } from "../client.js"; +import type { MatrixActionClient, MatrixActionClientOpts } from "./types.js"; export function ensureNodeRuntime() { if (isBunRuntime()) { @@ -19,23 +19,24 @@ export async function resolveActionClient( opts: MatrixActionClientOpts = {}, ): Promise { ensureNodeRuntime(); - if (opts.client) { - return { client: opts.client, stopOnDone: false }; - } - const active = getActiveMatrixClient(); - if (active) { - return { client: active, stopOnDone: false }; - } + if (opts.client) return { client: opts.client, stopOnDone: false }; + + // Try to get the active client for the specified account + const active = getActiveMatrixClient(opts.accountId); + if (active) return { client: active, stopOnDone: false }; + const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT); if (shouldShareClient) { const client = await resolveSharedMatrixClient({ cfg: getMatrixRuntime().config.loadConfig() as CoreConfig, timeoutMs: opts.timeoutMs, + accountId: opts.accountId, }); return { client, stopOnDone: false }; } const auth = await resolveMatrixAuth({ cfg: getMatrixRuntime().config.loadConfig() as CoreConfig, + accountId: opts.accountId ?? undefined, }); const client = await createMatrixClient({ homeserver: auth.homeserver, @@ -43,6 +44,7 @@ export async function resolveActionClient( accessToken: auth.accessToken, encryption: auth.encryption, localTimeoutMs: opts.timeoutMs, + accountId: opts.accountId ?? undefined, }); if (auth.encryption && client.crypto) { try { diff --git a/extensions/matrix/src/matrix/actions/messages.ts b/extensions/matrix/src/matrix/actions/messages.ts index d9cfe3722..2bc32e849 100644 --- a/extensions/matrix/src/matrix/actions/messages.ts +++ b/extensions/matrix/src/matrix/actions/messages.ts @@ -1,6 +1,3 @@ -import { resolveMatrixRoomId, sendMessageMatrix } from "../send.js"; -import { resolveActionClient } from "./client.js"; -import { summarizeMatrixRawEvent } from "./summary.js"; import { EventType, MsgType, @@ -10,6 +7,9 @@ import { type MatrixRawEvent, type RoomMessageEventContent, } from "./types.js"; +import { resolveActionClient } from "./client.js"; +import { summarizeMatrixRawEvent } from "./summary.js"; +import { resolveMatrixRoomId, sendMessageMatrix } from "../send.js"; export async function sendMatrixMessage( to: string, @@ -26,6 +26,7 @@ export async function sendMatrixMessage( threadId: opts.threadId, client: opts.client, timeoutMs: opts.timeoutMs, + accountId: opts.accountId, }); } @@ -36,9 +37,7 @@ export async function editMatrixMessage( opts: MatrixActionClientOpts = {}, ) { const trimmed = content.trim(); - if (!trimmed) { - throw new Error("Matrix edit requires content"); - } + if (!trimmed) throw new Error("Matrix edit requires content"); const { client, stopOnDone } = await resolveActionClient(opts); try { const resolvedRoom = await resolveMatrixRoomId(client, roomId); @@ -58,9 +57,7 @@ export async function editMatrixMessage( const eventId = await client.sendMessage(resolvedRoom, payload); return { eventId: eventId ?? null }; } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } @@ -74,9 +71,7 @@ export async function deleteMatrixMessage( const resolvedRoom = await resolveMatrixRoomId(client, roomId); await client.redactEvent(resolvedRoom, messageId, opts.reason); } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } @@ -102,7 +97,7 @@ export async function readMatrixMessages( const token = opts.before?.trim() || opts.after?.trim() || undefined; const dir = opts.after ? "f" : "b"; // @vector-im/matrix-bot-sdk uses doRequest for room messages - const res = (await client.doRequest( + const res = await client.doRequest( "GET", `/_matrix/client/v3/rooms/${encodeURIComponent(resolvedRoom)}/messages`, { @@ -110,7 +105,7 @@ export async function readMatrixMessages( limit, from: token, }, - )) as { chunk: MatrixRawEvent[]; start?: string; end?: string }; + ) as { chunk: MatrixRawEvent[]; start?: string; end?: string }; const messages = res.chunk .filter((event) => event.type === EventType.RoomMessage) .filter((event) => !event.unsigned?.redacted_because) @@ -121,8 +116,6 @@ export async function readMatrixMessages( prevBatch: res.start ?? null, }; } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } diff --git a/extensions/matrix/src/matrix/actions/pins.ts b/extensions/matrix/src/matrix/actions/pins.ts index 7d466db65..a29dfba45 100644 --- a/extensions/matrix/src/matrix/actions/pins.ts +++ b/extensions/matrix/src/matrix/actions/pins.ts @@ -1,12 +1,12 @@ -import { resolveMatrixRoomId } from "../send.js"; -import { resolveActionClient } from "./client.js"; -import { fetchEventSummary, readPinnedEvents } from "./summary.js"; import { EventType, type MatrixActionClientOpts, type MatrixMessageSummary, type RoomPinnedEventsEventContent, } from "./types.js"; +import { resolveActionClient } from "./client.js"; +import { fetchEventSummary, readPinnedEvents } from "./summary.js"; +import { resolveMatrixRoomId } from "../send.js"; export async function pinMatrixMessage( roomId: string, @@ -22,9 +22,7 @@ export async function pinMatrixMessage( await client.sendStateEvent(resolvedRoom, EventType.RoomPinnedEvents, "", payload); return { pinned: next }; } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } @@ -42,9 +40,7 @@ export async function unpinMatrixMessage( await client.sendStateEvent(resolvedRoom, EventType.RoomPinnedEvents, "", payload); return { pinned: next }; } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } @@ -69,8 +65,6 @@ export async function listMatrixPins( ).filter((event): event is MatrixMessageSummary => Boolean(event)); return { pinned, events }; } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } diff --git a/extensions/matrix/src/matrix/actions/reactions.ts b/extensions/matrix/src/matrix/actions/reactions.ts index fe8023960..044ef46c5 100644 --- a/extensions/matrix/src/matrix/actions/reactions.ts +++ b/extensions/matrix/src/matrix/actions/reactions.ts @@ -1,5 +1,3 @@ -import { resolveMatrixRoomId } from "../send.js"; -import { resolveActionClient } from "./client.js"; import { EventType, RelationType, @@ -8,6 +6,8 @@ import { type MatrixReactionSummary, type ReactionEventContent, } from "./types.js"; +import { resolveActionClient } from "./client.js"; +import { resolveMatrixRoomId } from "../send.js"; export async function listMatrixReactions( roomId: string, @@ -22,18 +22,16 @@ export async function listMatrixReactions( ? Math.max(1, Math.floor(opts.limit)) : 100; // @vector-im/matrix-bot-sdk uses doRequest for relations - const res = (await client.doRequest( + const res = await client.doRequest( "GET", `/_matrix/client/v1/rooms/${encodeURIComponent(resolvedRoom)}/relations/${encodeURIComponent(messageId)}/${RelationType.Annotation}/${EventType.Reaction}`, { dir: "b", limit }, - )) as { chunk: MatrixRawEvent[] }; + ) as { chunk: MatrixRawEvent[] }; const summaries = new Map(); for (const event of res.chunk) { const content = event.content as ReactionEventContent; const key = content["m.relates_to"]?.key; - if (!key) { - continue; - } + if (!key) continue; const sender = event.sender ?? ""; const entry: MatrixReactionSummary = summaries.get(key) ?? { key, @@ -48,9 +46,7 @@ export async function listMatrixReactions( } return Array.from(summaries.values()); } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } @@ -62,35 +58,27 @@ export async function removeMatrixReactions( const { client, stopOnDone } = await resolveActionClient(opts); try { const resolvedRoom = await resolveMatrixRoomId(client, roomId); - const res = (await client.doRequest( + const res = await client.doRequest( "GET", `/_matrix/client/v1/rooms/${encodeURIComponent(resolvedRoom)}/relations/${encodeURIComponent(messageId)}/${RelationType.Annotation}/${EventType.Reaction}`, { dir: "b", limit: 200 }, - )) as { chunk: MatrixRawEvent[] }; + ) as { chunk: MatrixRawEvent[] }; const userId = await client.getUserId(); - if (!userId) { - return { removed: 0 }; - } + if (!userId) return { removed: 0 }; const targetEmoji = opts.emoji?.trim(); const toRemove = res.chunk .filter((event) => event.sender === userId) .filter((event) => { - if (!targetEmoji) { - return true; - } + if (!targetEmoji) return true; const content = event.content as ReactionEventContent; return content["m.relates_to"]?.key === targetEmoji; }) .map((event) => event.event_id) .filter((id): id is string => Boolean(id)); - if (toRemove.length === 0) { - return { removed: 0 }; - } + if (toRemove.length === 0) return { removed: 0 }; await Promise.all(toRemove.map((id) => client.redactEvent(resolvedRoom, id))); return { removed: toRemove.length }; } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } diff --git a/extensions/matrix/src/matrix/actions/room.ts b/extensions/matrix/src/matrix/actions/room.ts index e1770c7bc..68cf9b0a0 100644 --- a/extensions/matrix/src/matrix/actions/room.ts +++ b/extensions/matrix/src/matrix/actions/room.ts @@ -1,6 +1,6 @@ -import { resolveMatrixRoomId } from "../send.js"; -import { resolveActionClient } from "./client.js"; import { EventType, type MatrixActionClientOpts } from "./types.js"; +import { resolveActionClient } from "./client.js"; +import { resolveMatrixRoomId } from "../send.js"; export async function getMatrixMemberInfo( userId: string, @@ -25,13 +25,14 @@ export async function getMatrixMemberInfo( roomId: roomId ?? null, }; } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } -export async function getMatrixRoomInfo(roomId: string, opts: MatrixActionClientOpts = {}) { +export async function getMatrixRoomInfo( + roomId: string, + opts: MatrixActionClientOpts = {}, +) { const { client, stopOnDone } = await resolveActionClient(opts); try { const resolvedRoom = await resolveMatrixRoomId(client, roomId); @@ -56,7 +57,11 @@ export async function getMatrixRoomInfo(roomId: string, opts: MatrixActionClient } try { - const aliasState = await client.getRoomStateEvent(resolvedRoom, "m.room.canonical_alias", ""); + const aliasState = await client.getRoomStateEvent( + resolvedRoom, + "m.room.canonical_alias", + "", + ); canonicalAlias = aliasState?.alias ?? null; } catch { // ignore @@ -78,8 +83,6 @@ export async function getMatrixRoomInfo(roomId: string, opts: MatrixActionClient memberCount, }; } finally { - if (stopOnDone) { - client.stop(); - } + if (stopOnDone) client.stop(); } } diff --git a/extensions/matrix/src/matrix/actions/summary.ts b/extensions/matrix/src/matrix/actions/summary.ts index d200e9927..2fa2d27b3 100644 --- a/extensions/matrix/src/matrix/actions/summary.ts +++ b/extensions/matrix/src/matrix/actions/summary.ts @@ -1,4 +1,5 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; + import { EventType, type MatrixMessageSummary, @@ -37,7 +38,10 @@ export function summarizeMatrixRawEvent(event: MatrixRawEvent): MatrixMessageSum }; } -export async function readPinnedEvents(client: MatrixClient, roomId: string): Promise { +export async function readPinnedEvents( + client: MatrixClient, + roomId: string, +): Promise { try { const content = (await client.getRoomStateEvent( roomId, @@ -64,9 +68,7 @@ export async function fetchEventSummary( ): Promise { try { const raw = (await client.getEvent(roomId, eventId)) as MatrixRawEvent; - if (raw.unsigned?.redacted_because) { - return null; - } + if (raw.unsigned?.redacted_because) return null; return summarizeMatrixRawEvent(raw); } catch { // Event not found, redacted, or inaccessible - return null diff --git a/extensions/matrix/src/matrix/actions/types.ts b/extensions/matrix/src/matrix/actions/types.ts index 75fddbd9c..96694f4c7 100644 --- a/extensions/matrix/src/matrix/actions/types.ts +++ b/extensions/matrix/src/matrix/actions/types.ts @@ -57,6 +57,7 @@ export type MatrixRawEvent = { export type MatrixActionClientOpts = { client?: MatrixClient; timeoutMs?: number; + accountId?: string | null; }; export type MatrixMessageSummary = { diff --git a/extensions/matrix/src/matrix/active-client.ts b/extensions/matrix/src/matrix/active-client.ts index 5ff540926..eb2c41ca8 100644 --- a/extensions/matrix/src/matrix/active-client.ts +++ b/extensions/matrix/src/matrix/active-client.ts @@ -1,11 +1,34 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; -let activeClient: MatrixClient | null = null; +const DEFAULT_ACCOUNT_KEY = "default"; -export function setActiveMatrixClient(client: MatrixClient | null): void { - activeClient = client; +// Multi-account: Map of accountId -> client +const activeClients = new Map(); + +function normalizeAccountKey(accountId?: string | null): string { + return accountId?.trim().toLowerCase() || DEFAULT_ACCOUNT_KEY; } -export function getActiveMatrixClient(): MatrixClient | null { - return activeClient; +export function setActiveMatrixClient(client: MatrixClient | null, accountId?: string | null): void { + const key = normalizeAccountKey(accountId); + if (client) { + activeClients.set(key, client); + } else { + activeClients.delete(key); + } +} + +export function getActiveMatrixClient(accountId?: string | null): MatrixClient | null { + const key = normalizeAccountKey(accountId); + const client = activeClients.get(key); + if (client) return client; + // Fallback: if specific account not found, try default + if (key !== DEFAULT_ACCOUNT_KEY) { + return activeClients.get(DEFAULT_ACCOUNT_KEY) ?? null; + } + return null; +} + +export function listActiveMatrixClients(): Array<{ accountId: string; client: MatrixClient }> { + return Array.from(activeClients.entries()).map(([accountId, client]) => ({ accountId, client })); } diff --git a/extensions/matrix/src/matrix/client.test.ts b/extensions/matrix/src/matrix/client.test.ts index 69de112db..f806f9c81 100644 --- a/extensions/matrix/src/matrix/client.test.ts +++ b/extensions/matrix/src/matrix/client.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "vitest"; + import type { CoreConfig } from "../types.js"; import { resolveMatrixConfig } from "./client.js"; diff --git a/extensions/matrix/src/matrix/client.ts b/extensions/matrix/src/matrix/client.ts index 0d35cde2e..82c734d15 100644 --- a/extensions/matrix/src/matrix/client.ts +++ b/extensions/matrix/src/matrix/client.ts @@ -2,4 +2,8 @@ export type { MatrixAuth, MatrixResolvedConfig } from "./client/types.js"; export { isBunRuntime } from "./client/runtime.js"; export { resolveMatrixConfig, resolveMatrixAuth } from "./client/config.js"; export { createMatrixClient } from "./client/create-client.js"; -export { resolveSharedMatrixClient, waitForMatrixSync, stopSharedClient } from "./client/shared.js"; +export { + resolveSharedMatrixClient, + waitForMatrixSync, + stopSharedClient, +} from "./client/shared.js"; diff --git a/extensions/matrix/src/matrix/client/config.ts b/extensions/matrix/src/matrix/client/config.ts index 3c6c0da66..1cc28cc2b 100644 --- a/extensions/matrix/src/matrix/client/config.ts +++ b/extensions/matrix/src/matrix/client/config.ts @@ -1,28 +1,70 @@ import { MatrixClient } from "@vector-im/matrix-bot-sdk"; -import type { CoreConfig } from "../types.js"; -import type { MatrixAuth, MatrixResolvedConfig } from "./types.js"; +import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "openclaw/plugin-sdk"; + +import type { CoreConfig, MatrixAccountConfig, MatrixConfig } from "../types.js"; import { getMatrixRuntime } from "../../runtime.js"; import { ensureMatrixSdkLoggingConfigured } from "./logging.js"; +import type { MatrixAuth, MatrixResolvedConfig } from "./types.js"; +import { importCredentials } from "../import-mutex.js"; function clean(value?: string): string { return value?.trim() ?? ""; } +/** + * Get account-specific config from channels.matrix.accounts[accountId] + */ +function resolveAccountConfig( + cfg: CoreConfig, + accountId: string, +): MatrixAccountConfig | undefined { + const accounts = cfg.channels?.matrix?.accounts; + if (!accounts || typeof accounts !== "object") { + return undefined; + } + const direct = accounts[accountId] as MatrixAccountConfig | undefined; + if (direct) return direct; + + const normalized = normalizeAccountId(accountId); + const matchKey = Object.keys(accounts).find( + (key) => normalizeAccountId(key) === normalized + ); + return matchKey ? (accounts[matchKey] as MatrixAccountConfig | undefined) : undefined; +} + +/** + * Merge base matrix config with account-specific overrides + */ +function mergeMatrixAccountConfig(cfg: CoreConfig, accountId: string): MatrixAccountConfig { + const base = cfg.channels?.matrix ?? {}; + const { accounts: _ignored, ...baseConfig } = base as MatrixConfig; + const accountConfig = resolveAccountConfig(cfg, accountId) ?? {}; + return { ...baseConfig, ...accountConfig }; +} + export function resolveMatrixConfig( cfg: CoreConfig = getMatrixRuntime().config.loadConfig() as CoreConfig, env: NodeJS.ProcessEnv = process.env, + accountId?: string, ): MatrixResolvedConfig { - const matrix = cfg.channels?.matrix ?? {}; - const homeserver = clean(matrix.homeserver) || clean(env.MATRIX_HOMESERVER); - const userId = clean(matrix.userId) || clean(env.MATRIX_USER_ID); - const accessToken = clean(matrix.accessToken) || clean(env.MATRIX_ACCESS_TOKEN) || undefined; - const password = clean(matrix.password) || clean(env.MATRIX_PASSWORD) || undefined; - const deviceName = clean(matrix.deviceName) || clean(env.MATRIX_DEVICE_NAME) || undefined; + const normalizedAccountId = normalizeAccountId(accountId); + const isDefaultAccount = normalizedAccountId === DEFAULT_ACCOUNT_ID || normalizedAccountId === "default"; + + // Get merged config for this account + const merged = mergeMatrixAccountConfig(cfg, normalizedAccountId); + + // For default account, allow env var fallbacks + const homeserver = clean(merged.homeserver) || (isDefaultAccount ? clean(env.MATRIX_HOMESERVER) : ""); + const userId = clean(merged.userId) || (isDefaultAccount ? clean(env.MATRIX_USER_ID) : ""); + const accessToken = clean(merged.accessToken) || (isDefaultAccount ? clean(env.MATRIX_ACCESS_TOKEN) : "") || undefined; + const password = clean(merged.password) || (isDefaultAccount ? clean(env.MATRIX_PASSWORD) : "") || undefined; + const deviceName = clean(merged.deviceName) || (isDefaultAccount ? clean(env.MATRIX_DEVICE_NAME) : "") || undefined; const initialSyncLimit = - typeof matrix.initialSyncLimit === "number" - ? Math.max(0, Math.floor(matrix.initialSyncLimit)) + typeof merged.initialSyncLimit === "number" + ? Math.max(0, Math.floor(merged.initialSyncLimit)) : undefined; - const encryption = matrix.encryption ?? false; + const encryption = merged.encryption ?? false; + return { homeserver, userId, @@ -37,22 +79,30 @@ export function resolveMatrixConfig( export async function resolveMatrixAuth(params?: { cfg?: CoreConfig; env?: NodeJS.ProcessEnv; + accountId?: string; }): Promise { const cfg = params?.cfg ?? (getMatrixRuntime().config.loadConfig() as CoreConfig); const env = params?.env ?? process.env; - const resolved = resolveMatrixConfig(cfg, env); + const accountId = params?.accountId; + const resolved = resolveMatrixConfig(cfg, env, accountId); + if (!resolved.homeserver) { - throw new Error("Matrix homeserver is required (matrix.homeserver)"); + throw new Error(`Matrix homeserver is required for account ${accountId ?? "default"} (matrix.homeserver)`); } + const normalizedAccountId = normalizeAccountId(accountId); + const isDefaultAccount = normalizedAccountId === DEFAULT_ACCOUNT_ID || normalizedAccountId === "default"; + + // Only use cached credentials for default account + // Use serialized import to prevent race conditions during parallel account startup const { loadMatrixCredentials, saveMatrixCredentials, credentialsMatchConfig, touchMatrixCredentials, - } = await import("../credentials.js"); + } = await importCredentials(); - const cached = loadMatrixCredentials(env); + const cached = isDefaultAccount ? loadMatrixCredentials(env) : null; const cachedCredentials = cached && credentialsMatchConfig(cached, { @@ -71,13 +121,15 @@ export async function resolveMatrixAuth(params?: { const tempClient = new MatrixClient(resolved.homeserver, resolved.accessToken); const whoami = await tempClient.getUserId(); userId = whoami; - // Save the credentials with the fetched userId - saveMatrixCredentials({ - homeserver: resolved.homeserver, - userId, - accessToken: resolved.accessToken, - }); - } else if (cachedCredentials && cachedCredentials.accessToken === resolved.accessToken) { + // Only save credentials for default account + if (isDefaultAccount) { + saveMatrixCredentials({ + homeserver: resolved.homeserver, + userId, + accessToken: resolved.accessToken, + }); + } + } else if (isDefaultAccount && cachedCredentials && cachedCredentials.accessToken === resolved.accessToken) { touchMatrixCredentials(env); } return { @@ -90,7 +142,8 @@ export async function resolveMatrixAuth(params?: { }; } - if (cachedCredentials) { + // Try cached credentials (only for default account) + if (isDefaultAccount && cachedCredentials) { touchMatrixCredentials(env); return { homeserver: cachedCredentials.homeserver, @@ -103,12 +156,14 @@ export async function resolveMatrixAuth(params?: { } if (!resolved.userId) { - throw new Error("Matrix userId is required when no access token is configured (matrix.userId)"); + throw new Error( + `Matrix userId is required for account ${accountId ?? "default"} when no access token is configured`, + ); } if (!resolved.password) { throw new Error( - "Matrix password is required when no access token is configured (matrix.password)", + `Matrix password is required for account ${accountId ?? "default"} when no access token is configured`, ); } @@ -126,7 +181,7 @@ export async function resolveMatrixAuth(params?: { if (!loginResponse.ok) { const errorText = await loginResponse.text(); - throw new Error(`Matrix login failed: ${errorText}`); + throw new Error(`Matrix login failed for account ${accountId ?? "default"}: ${errorText}`); } const login = (await loginResponse.json()) as { @@ -137,7 +192,7 @@ export async function resolveMatrixAuth(params?: { const accessToken = login.access_token?.trim(); if (!accessToken) { - throw new Error("Matrix login did not return an access token"); + throw new Error(`Matrix login did not return an access token for account ${accountId ?? "default"}`); } const auth: MatrixAuth = { @@ -149,12 +204,15 @@ export async function resolveMatrixAuth(params?: { encryption: resolved.encryption, }; - saveMatrixCredentials({ - homeserver: auth.homeserver, - userId: auth.userId, - accessToken: auth.accessToken, - deviceId: login.device_id, - }); + // Only save credentials for default account + if (isDefaultAccount) { + saveMatrixCredentials({ + homeserver: auth.homeserver, + userId: auth.userId, + accessToken: auth.accessToken, + deviceId: login.device_id, + }); + } return auth; } diff --git a/extensions/matrix/src/matrix/client/create-client.ts b/extensions/matrix/src/matrix/client/create-client.ts index d2dc7eaf8..3f8d11040 100644 --- a/extensions/matrix/src/matrix/client/create-client.ts +++ b/extensions/matrix/src/matrix/client/create-client.ts @@ -1,11 +1,15 @@ -import type { IStorageProvider, ICryptoStorageProvider } from "@vector-im/matrix-bot-sdk"; +import fs from "node:fs"; + import { LogService, MatrixClient, SimpleFsStorageProvider, RustSdkCryptoStorageProvider, } from "@vector-im/matrix-bot-sdk"; -import fs from "node:fs"; + +import { importCryptoNodejs } from "../import-mutex.js"; +import type { IStorageProvider, ICryptoStorageProvider } from "@vector-im/matrix-bot-sdk"; + import { ensureMatrixSdkLoggingConfigured } from "./logging.js"; import { maybeMigrateLegacyStorage, @@ -14,9 +18,7 @@ import { } from "./storage.js"; function sanitizeUserIdList(input: unknown, label: string): string[] { - if (input == null) { - return []; - } + if (input == null) return []; if (!Array.isArray(input)) { LogService.warn( "MatrixClientLite", @@ -65,14 +67,14 @@ export async function createMatrixClient(params: { fs.mkdirSync(storagePaths.cryptoPath, { recursive: true }); try { - const { StoreType } = await import("@matrix-org/matrix-sdk-crypto-nodejs"); - cryptoStorage = new RustSdkCryptoStorageProvider(storagePaths.cryptoPath, StoreType.Sqlite); - } catch (err) { - LogService.warn( - "MatrixClientLite", - "Failed to initialize crypto storage, E2EE disabled:", - err, + // Use serialized import to prevent race conditions with native Rust module + const { StoreType } = await importCryptoNodejs(); + cryptoStorage = new RustSdkCryptoStorageProvider( + storagePaths.cryptoPath, + StoreType.Sqlite, ); + } catch (err) { + LogService.warn("MatrixClientLite", "Failed to initialize crypto storage, E2EE disabled:", err); } } @@ -83,7 +85,12 @@ export async function createMatrixClient(params: { accountId: params.accountId, }); - const client = new MatrixClient(params.homeserver, params.accessToken, storage, cryptoStorage); + const client = new MatrixClient( + params.homeserver, + params.accessToken, + storage, + cryptoStorage, + ); if (client.crypto) { const originalUpdateSyncData = client.crypto.updateSyncData.bind(client.crypto); diff --git a/extensions/matrix/src/matrix/client/logging.ts b/extensions/matrix/src/matrix/client/logging.ts index c5ef702b0..5a7180597 100644 --- a/extensions/matrix/src/matrix/client/logging.ts +++ b/extensions/matrix/src/matrix/client/logging.ts @@ -3,33 +3,32 @@ import { ConsoleLogger, LogService } from "@vector-im/matrix-bot-sdk"; let matrixSdkLoggingConfigured = false; const matrixSdkBaseLogger = new ConsoleLogger(); -function shouldSuppressMatrixHttpNotFound(module: string, messageOrObject: unknown[]): boolean { - if (module !== "MatrixHttpClient") { - return false; - } +function shouldSuppressMatrixHttpNotFound( + module: string, + messageOrObject: unknown[], +): boolean { + if (module !== "MatrixHttpClient") return false; return messageOrObject.some((entry) => { - if (!entry || typeof entry !== "object") { - return false; - } + if (!entry || typeof entry !== "object") return false; return (entry as { errcode?: string }).errcode === "M_NOT_FOUND"; }); } export function ensureMatrixSdkLoggingConfigured(): void { - if (matrixSdkLoggingConfigured) { - return; - } + if (matrixSdkLoggingConfigured) return; matrixSdkLoggingConfigured = true; LogService.setLogger({ - trace: (module, ...messageOrObject) => matrixSdkBaseLogger.trace(module, ...messageOrObject), - debug: (module, ...messageOrObject) => matrixSdkBaseLogger.debug(module, ...messageOrObject), - info: (module, ...messageOrObject) => matrixSdkBaseLogger.info(module, ...messageOrObject), - warn: (module, ...messageOrObject) => matrixSdkBaseLogger.warn(module, ...messageOrObject), + trace: (module, ...messageOrObject) => + matrixSdkBaseLogger.trace(module, ...messageOrObject), + debug: (module, ...messageOrObject) => + matrixSdkBaseLogger.debug(module, ...messageOrObject), + info: (module, ...messageOrObject) => + matrixSdkBaseLogger.info(module, ...messageOrObject), + warn: (module, ...messageOrObject) => + matrixSdkBaseLogger.warn(module, ...messageOrObject), error: (module, ...messageOrObject) => { - if (shouldSuppressMatrixHttpNotFound(module, messageOrObject)) { - return; - } + if (shouldSuppressMatrixHttpNotFound(module, messageOrObject)) return; matrixSdkBaseLogger.error(module, ...messageOrObject); }, }); diff --git a/extensions/matrix/src/matrix/client/shared.ts b/extensions/matrix/src/matrix/client/shared.ts index 201eb5bbd..e2163328c 100644 --- a/extensions/matrix/src/matrix/client/shared.ts +++ b/extensions/matrix/src/matrix/client/shared.ts @@ -1,10 +1,11 @@ -import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import { LogService } from "@vector-im/matrix-bot-sdk"; +import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; + import type { CoreConfig } from "../types.js"; -import type { MatrixAuth } from "./types.js"; -import { resolveMatrixAuth } from "./config.js"; import { createMatrixClient } from "./create-client.js"; +import { resolveMatrixAuth } from "./config.js"; import { DEFAULT_ACCOUNT_KEY } from "./storage.js"; +import type { MatrixAuth } from "./types.js"; type SharedMatrixClientState = { client: MatrixClient; @@ -13,6 +14,12 @@ type SharedMatrixClientState = { cryptoReady: boolean; }; +// Multi-account support: Map of accountKey -> client state +const sharedClients = new Map(); +const sharedClientPromises = new Map>(); +const sharedClientStartPromises = new Map>(); + +// Legacy single-client references (for backwards compatibility) let sharedClientState: SharedMatrixClientState | null = null; let sharedClientPromise: Promise | null = null; let sharedClientStartPromise: Promise | null = null; @@ -27,10 +34,14 @@ function buildSharedClientKey(auth: MatrixAuth, accountId?: string | null): stri ].join("|"); } +function getAccountKey(accountId?: string | null): string { + return accountId ?? DEFAULT_ACCOUNT_KEY; +} + async function createSharedMatrixClient(params: { auth: MatrixAuth; timeoutMs?: number; - accountId?: string | null; + accountId?: string; }): Promise { const client = await createMatrixClient({ homeserver: params.auth.homeserver, @@ -53,15 +64,24 @@ async function ensureSharedClientStarted(params: { timeoutMs?: number; initialSyncLimit?: number; encryption?: boolean; + accountId?: string | null; }): Promise { - if (params.state.started) { + if (params.state.started) return; + + const accountKey = getAccountKey(params.accountId); + const existingPromise = sharedClientStartPromises.get(accountKey); + if (existingPromise) { + await existingPromise; return; } - if (sharedClientStartPromise) { + + // Legacy compatibility + if (sharedClientStartPromise && !params.accountId) { await sharedClientStartPromise; return; } - sharedClientStartPromise = (async () => { + + const startPromise = (async () => { const client = params.state.client; // Initialize crypto if enabled @@ -80,10 +100,19 @@ async function ensureSharedClientStarted(params: { await client.start(); params.state.started = true; })(); + + sharedClientStartPromises.set(accountKey, startPromise); + if (!params.accountId) { + sharedClientStartPromise = startPromise; + } + try { - await sharedClientStartPromise; + await startPromise; } finally { - sharedClientStartPromise = null; + sharedClientStartPromises.delete(accountKey); + if (!params.accountId) { + sharedClientStartPromise = null; + } } } @@ -97,23 +126,67 @@ export async function resolveSharedMatrixClient( accountId?: string | null; } = {}, ): Promise { - const auth = params.auth ?? (await resolveMatrixAuth({ cfg: params.cfg, env: params.env })); + const auth = params.auth ?? (await resolveMatrixAuth({ cfg: params.cfg, env: params.env, accountId: params.accountId ?? undefined })); const key = buildSharedClientKey(auth, params.accountId); + const accountKey = getAccountKey(params.accountId); const shouldStart = params.startClient !== false; - if (sharedClientState?.key === key) { + // Check if we already have this client in the multi-account map + const existingClient = sharedClients.get(accountKey); + if (existingClient?.key === key) { + if (shouldStart) { + await ensureSharedClientStarted({ + state: existingClient, + timeoutMs: params.timeoutMs, + initialSyncLimit: auth.initialSyncLimit, + encryption: auth.encryption, + accountId: params.accountId, + }); + } + // Update legacy reference for default account + if (!params.accountId || params.accountId === DEFAULT_ACCOUNT_KEY) { + sharedClientState = existingClient; + } + return existingClient.client; + } + + // Legacy compatibility: check old single-client state + if (!params.accountId && sharedClientState?.key === key) { if (shouldStart) { await ensureSharedClientStarted({ state: sharedClientState, timeoutMs: params.timeoutMs, initialSyncLimit: auth.initialSyncLimit, encryption: auth.encryption, + accountId: params.accountId, }); } return sharedClientState.client; } - if (sharedClientPromise) { + // Check for pending creation promise for this account + const pendingPromise = sharedClientPromises.get(accountKey); + if (pendingPromise) { + const pending = await pendingPromise; + if (pending.key === key) { + if (shouldStart) { + await ensureSharedClientStarted({ + state: pending, + timeoutMs: params.timeoutMs, + initialSyncLimit: auth.initialSyncLimit, + encryption: auth.encryption, + accountId: params.accountId, + }); + } + return pending.client; + } + // Key mismatch - stop old client + pending.client.stop(); + sharedClients.delete(accountKey); + } + + // Legacy: check old single-client promise + if (!params.accountId && sharedClientPromise) { const pending = await sharedClientPromise; if (pending.key === key) { if (shouldStart) { @@ -122,6 +195,7 @@ export async function resolveSharedMatrixClient( timeoutMs: params.timeoutMs, initialSyncLimit: auth.initialSyncLimit, encryption: auth.encryption, + accountId: params.accountId, }); } return pending.client; @@ -131,25 +205,39 @@ export async function resolveSharedMatrixClient( sharedClientPromise = null; } - sharedClientPromise = createSharedMatrixClient({ + // Create new client + const createPromise = createSharedMatrixClient({ auth, timeoutMs: params.timeoutMs, - accountId: params.accountId, + accountId: params.accountId ?? undefined, }); + + sharedClientPromises.set(accountKey, createPromise); + if (!params.accountId || params.accountId === DEFAULT_ACCOUNT_KEY) { + sharedClientPromise = createPromise; + } + try { - const created = await sharedClientPromise; - sharedClientState = created; + const created = await createPromise; + sharedClients.set(accountKey, created); + if (!params.accountId || params.accountId === DEFAULT_ACCOUNT_KEY) { + sharedClientState = created; + } if (shouldStart) { await ensureSharedClientStarted({ state: created, timeoutMs: params.timeoutMs, initialSyncLimit: auth.initialSyncLimit, encryption: auth.encryption, + accountId: params.accountId, }); } return created.client; } finally { - sharedClientPromise = null; + sharedClientPromises.delete(accountKey); + if (!params.accountId || params.accountId === DEFAULT_ACCOUNT_KEY) { + sharedClientPromise = null; + } } } @@ -162,9 +250,28 @@ export async function waitForMatrixSync(_params: { // This is kept for API compatibility but is essentially a no-op now } -export function stopSharedClient(): void { - if (sharedClientState) { - sharedClientState.client.stop(); - sharedClientState = null; +export function stopSharedClient(accountId?: string | null): void { + if (accountId) { + // Stop specific account + const accountKey = getAccountKey(accountId); + const client = sharedClients.get(accountKey); + if (client) { + client.client.stop(); + sharedClients.delete(accountKey); + } + // Also clear legacy reference if it matches + if (sharedClientState?.key === client?.key) { + sharedClientState = null; + } + } else { + // Stop all clients (legacy behavior + all multi-account clients) + for (const [key, client] of sharedClients) { + client.client.stop(); + sharedClients.delete(key); + } + if (sharedClientState) { + sharedClientState.client.stop(); + sharedClientState = null; + } } } diff --git a/extensions/matrix/src/matrix/client/storage.ts b/extensions/matrix/src/matrix/client/storage.ts index 1c9dfbf33..b5ddffe88 100644 --- a/extensions/matrix/src/matrix/client/storage.ts +++ b/extensions/matrix/src/matrix/client/storage.ts @@ -2,8 +2,9 @@ import crypto from "node:crypto"; import fs from "node:fs"; import os from "node:os"; import path from "node:path"; -import type { MatrixStoragePaths } from "./types.js"; + import { getMatrixRuntime } from "../../runtime.js"; +import type { MatrixStoragePaths } from "./types.js"; export const DEFAULT_ACCOUNT_KEY = "default"; const STORAGE_META_FILENAME = "storage-meta.json"; @@ -20,9 +21,7 @@ function sanitizePathSegment(value: string): string { function resolveHomeserverKey(homeserver: string): string { try { const url = new URL(homeserver); - if (url.host) { - return sanitizePathSegment(url.host); - } + if (url.host) return sanitizePathSegment(url.host); } catch { // fall through } @@ -83,14 +82,11 @@ export function maybeMigrateLegacyStorage(params: { const hasLegacyStorage = fs.existsSync(legacy.storagePath); const hasLegacyCrypto = fs.existsSync(legacy.cryptoPath); const hasNewStorage = - fs.existsSync(params.storagePaths.storagePath) || fs.existsSync(params.storagePaths.cryptoPath); + fs.existsSync(params.storagePaths.storagePath) || + fs.existsSync(params.storagePaths.cryptoPath); - if (!hasLegacyStorage && !hasLegacyCrypto) { - return; - } - if (hasNewStorage) { - return; - } + if (!hasLegacyStorage && !hasLegacyCrypto) return; + if (hasNewStorage) return; fs.mkdirSync(params.storagePaths.rootDir, { recursive: true }); if (hasLegacyStorage) { @@ -124,7 +120,11 @@ export function writeStorageMeta(params: { createdAt: new Date().toISOString(), }; fs.mkdirSync(params.storagePaths.rootDir, { recursive: true }); - fs.writeFileSync(params.storagePaths.metaPath, JSON.stringify(payload, null, 2), "utf-8"); + fs.writeFileSync( + params.storagePaths.metaPath, + JSON.stringify(payload, null, 2), + "utf-8", + ); } catch { // ignore meta write failures } diff --git a/extensions/matrix/src/matrix/credentials.ts b/extensions/matrix/src/matrix/credentials.ts index 04072dc72..45388462d 100644 --- a/extensions/matrix/src/matrix/credentials.ts +++ b/extensions/matrix/src/matrix/credentials.ts @@ -1,6 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; + import { getMatrixRuntime } from "../runtime.js"; export type MatrixStoredCredentials = { @@ -18,7 +19,8 @@ export function resolveMatrixCredentialsDir( env: NodeJS.ProcessEnv = process.env, stateDir?: string, ): string { - const resolvedStateDir = stateDir ?? getMatrixRuntime().state.resolveStateDir(env, os.homedir); + const resolvedStateDir = + stateDir ?? getMatrixRuntime().state.resolveStateDir(env, os.homedir); return path.join(resolvedStateDir, "credentials", "matrix"); } @@ -32,9 +34,7 @@ export function loadMatrixCredentials( ): MatrixStoredCredentials | null { const credPath = resolveMatrixCredentialsPath(env); try { - if (!fs.existsSync(credPath)) { - return null; - } + if (!fs.existsSync(credPath)) return null; const raw = fs.readFileSync(credPath, "utf-8"); const parsed = JSON.parse(raw) as Partial; if ( @@ -73,9 +73,7 @@ export function saveMatrixCredentials( export function touchMatrixCredentials(env: NodeJS.ProcessEnv = process.env): void { const existing = loadMatrixCredentials(env); - if (!existing) { - return; - } + if (!existing) return; existing.lastUsedAt = new Date().toISOString(); const credPath = resolveMatrixCredentialsPath(env); diff --git a/extensions/matrix/src/matrix/deps.ts b/extensions/matrix/src/matrix/deps.ts index 67fb5244a..ddbe60d57 100644 --- a/extensions/matrix/src/matrix/deps.ts +++ b/extensions/matrix/src/matrix/deps.ts @@ -1,8 +1,9 @@ -import type { RuntimeEnv } from "openclaw/plugin-sdk"; import fs from "node:fs"; -import { createRequire } from "node:module"; import path from "node:path"; +import { createRequire } from "node:module"; import { fileURLToPath } from "node:url"; + +import type { RuntimeEnv } from "openclaw/plugin-sdk"; import { getMatrixRuntime } from "../runtime.js"; const MATRIX_SDK_PACKAGE = "@vector-im/matrix-bot-sdk"; @@ -26,9 +27,7 @@ export async function ensureMatrixSdkInstalled(params: { runtime: RuntimeEnv; confirm?: (message: string) => Promise; }): Promise { - if (isMatrixSdkAvailable()) { - return; - } + if (isMatrixSdkAvailable()) return; const confirm = params.confirm; if (confirm) { const ok = await confirm("Matrix requires @vector-im/matrix-bot-sdk. Install now?"); @@ -53,8 +52,6 @@ export async function ensureMatrixSdkInstalled(params: { ); } if (!isMatrixSdkAvailable()) { - throw new Error( - "Matrix dependency install completed but @vector-im/matrix-bot-sdk is still missing.", - ); + throw new Error("Matrix dependency install completed but @vector-im/matrix-bot-sdk is still missing."); } } diff --git a/extensions/matrix/src/matrix/format.test.ts b/extensions/matrix/src/matrix/format.test.ts index 4538c2792..5ae98c97c 100644 --- a/extensions/matrix/src/matrix/format.test.ts +++ b/extensions/matrix/src/matrix/format.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "vitest"; + import { markdownToMatrixHtml } from "./format.js"; describe("markdownToMatrixHtml", () => { diff --git a/extensions/matrix/src/matrix/import-mutex.ts b/extensions/matrix/src/matrix/import-mutex.ts new file mode 100644 index 000000000..820c9fa4a --- /dev/null +++ b/extensions/matrix/src/matrix/import-mutex.ts @@ -0,0 +1,88 @@ +/** + * Import Mutex - Serializes dynamic imports to prevent race conditions + * + * Problem: When multiple Matrix accounts start in parallel, they all call + * dynamic imports simultaneously. Native modules (like @matrix-org/matrix-sdk-crypto-nodejs) + * can crash when loaded in parallel from multiple promises. + * + * Solution: Cache the import promise so that concurrent callers await the same promise + * instead of triggering parallel imports. + */ + +// Cache for import promises - key is module specifier +const importCache = new Map>(); + +/** + * Safely import a module with deduplication. + * If an import is already in progress, returns the existing promise. + * Once resolved, the result is cached for future calls. + */ +export async function serializedImport( + moduleSpecifier: string, + importFn: () => Promise +): Promise { + const existing = importCache.get(moduleSpecifier); + if (existing) { + return existing as Promise; + } + + const importPromise = importFn().catch((err) => { + // On failure, remove from cache to allow retry + importCache.delete(moduleSpecifier); + throw err; + }); + + importCache.set(moduleSpecifier, importPromise); + return importPromise; +} + +// Pre-cached imports for critical modules +let cryptoNodejsModule: typeof import("@matrix-org/matrix-sdk-crypto-nodejs") | null = null; +let credentialsModule: typeof import("./credentials.js") | null = null; + +/** + * Safely import the crypto-nodejs module (Rust native). + * This is the most critical one - parallel imports of native modules crash. + */ +export async function importCryptoNodejs(): Promise { + if (cryptoNodejsModule) return cryptoNodejsModule; + + const mod = await serializedImport( + "@matrix-org/matrix-sdk-crypto-nodejs", + () => import("@matrix-org/matrix-sdk-crypto-nodejs") + ); + cryptoNodejsModule = mod; + return mod; +} + +/** + * Safely import the credentials module. + */ +export async function importCredentials(): Promise { + if (credentialsModule) return credentialsModule; + + const mod = await serializedImport( + "../credentials.js", + () => import("./credentials.js") + ); + credentialsModule = mod; + return mod; +} + +// Pre-cached import for matrix index module +let matrixIndexModule: typeof import("./index.js") | null = null; + +/** + * Safely import the matrix/index.js module. + * This is called from channel.ts during parallel account startup. + */ +export async function importMatrixIndex(): Promise { + if (matrixIndexModule) return matrixIndexModule; + + const mod = await serializedImport( + "./matrix/index.js", + () => import("./index.js") + ); + matrixIndexModule = mod; + return mod; +} diff --git a/extensions/matrix/src/matrix/monitor/allowlist.ts b/extensions/matrix/src/matrix/monitor/allowlist.ts index b110dc9ef..373b48000 100644 --- a/extensions/matrix/src/matrix/monitor/allowlist.ts +++ b/extensions/matrix/src/matrix/monitor/allowlist.ts @@ -22,9 +22,7 @@ export function resolveMatrixAllowListMatch(params: { userName?: string; }): MatrixAllowListMatch { const allowList = params.allowList; - if (allowList.length === 0) { - return { allowed: false }; - } + if (allowList.length === 0) return { allowed: false }; if (allowList.includes("*")) { return { allowed: true, matchKey: "*", matchSource: "wildcard" }; } @@ -39,9 +37,7 @@ export function resolveMatrixAllowListMatch(params: { { value: localPart, source: "localpart" }, ]; for (const candidate of candidates) { - if (!candidate.value) { - continue; - } + if (!candidate.value) continue; if (allowList.includes(candidate.value)) { return { allowed: true, diff --git a/extensions/matrix/src/matrix/monitor/auto-join.ts b/extensions/matrix/src/matrix/monitor/auto-join.ts index 6fb36b93f..77d15296e 100644 --- a/extensions/matrix/src/matrix/monitor/auto-join.ts +++ b/extensions/matrix/src/matrix/monitor/auto-join.ts @@ -1,6 +1,7 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; -import type { RuntimeEnv } from "openclaw/plugin-sdk"; import { AutojoinRoomsMixin } from "@vector-im/matrix-bot-sdk"; + +import type { RuntimeEnv } from "openclaw/plugin-sdk"; import type { CoreConfig } from "../../types.js"; import { getMatrixRuntime } from "../../runtime.js"; @@ -12,9 +13,7 @@ export function registerMatrixAutoJoin(params: { const { client, cfg, runtime } = params; const core = getMatrixRuntime(); const logVerbose = (message: string) => { - if (!core.logging.shouldLogVerbose()) { - return; - } + if (!core.logging.shouldLogVerbose()) return; runtime.log?.(message); }; const autoJoin = cfg.channels?.matrix?.autoJoin ?? "always"; @@ -33,9 +32,7 @@ export function registerMatrixAutoJoin(params: { // For "allowlist" mode, handle invites manually client.on("room.invite", async (roomId: string, _inviteEvent: unknown) => { - if (autoJoin !== "allowlist") { - return; - } + if (autoJoin !== "allowlist") return; // Get room alias if available let alias: string | undefined; diff --git a/extensions/matrix/src/matrix/monitor/debug-log.ts b/extensions/matrix/src/matrix/monitor/debug-log.ts new file mode 100644 index 000000000..2b47ff5eb --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/debug-log.ts @@ -0,0 +1,14 @@ +import fs from "node:fs"; +import path from "node:path"; + +const DEBUG_LOG_PATH = "/home/keller/clawd/agents/mondo-assistant/debug-matrix.log"; + +export function debugLog(message: string): void { + const timestamp = new Date().toISOString(); + const line = `[${timestamp}] ${message}\n`; + try { + fs.appendFileSync(DEBUG_LOG_PATH, line); + } catch { + // Ignore errors + } +} diff --git a/extensions/matrix/src/matrix/monitor/direct.ts b/extensions/matrix/src/matrix/monitor/direct.ts index 5cd6e8875..cd2234fdd 100644 --- a/extensions/matrix/src/matrix/monitor/direct.ts +++ b/extensions/matrix/src/matrix/monitor/direct.ts @@ -12,16 +12,17 @@ type DirectRoomTrackerOptions = { const DM_CACHE_TTL_MS = 30_000; -export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTrackerOptions = {}) { +export function createDirectRoomTracker( + client: MatrixClient, + opts: DirectRoomTrackerOptions = {}, +) { const log = opts.log ?? (() => {}); let lastDmUpdateMs = 0; let cachedSelfUserId: string | null = null; const memberCountCache = new Map(); const ensureSelfUserId = async (): Promise => { - if (cachedSelfUserId) { - return cachedSelfUserId; - } + if (cachedSelfUserId) return cachedSelfUserId; try { cachedSelfUserId = await client.getUserId(); } catch { @@ -32,9 +33,7 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr const refreshDmCache = async (): Promise => { const now = Date.now(); - if (now - lastDmUpdateMs < DM_CACHE_TTL_MS) { - return; - } + if (now - lastDmUpdateMs < DM_CACHE_TTL_MS) return; lastDmUpdateMs = now; try { await client.dms.update(); @@ -62,9 +61,7 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr const hasDirectFlag = async (roomId: string, userId?: string): Promise => { const target = userId?.trim(); - if (!target) { - return false; - } + if (!target) return false; try { const state = await client.getRoomStateEvent(roomId, "m.room.member", target); return state?.is_direct === true; @@ -97,7 +94,11 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr return true; } - log(`matrix: dm check room=${roomId} result=group members=${memberCount ?? "unknown"}`); + log( + `matrix: dm check room=${roomId} result=group members=${ + memberCount ?? "unknown" + }`, + ); return false; }, }; diff --git a/extensions/matrix/src/matrix/monitor/events.ts b/extensions/matrix/src/matrix/monitor/events.ts index 1faeffc81..1c604f872 100644 --- a/extensions/matrix/src/matrix/monitor/events.ts +++ b/extensions/matrix/src/matrix/monitor/events.ts @@ -1,5 +1,6 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { PluginRuntime } from "openclaw/plugin-sdk"; + import type { MatrixAuth } from "../client.js"; import type { MatrixRawEvent } from "./types.js"; import { EventType } from "./types.js"; @@ -25,7 +26,41 @@ export function registerMatrixMonitorEvents(params: { onRoomMessage, } = params; - client.on("room.message", onRoomMessage); + // Track processed event IDs to avoid double-processing from room.message + room.decrypted_event + const processedEvents = new Set(); + const PROCESSED_EVENTS_MAX = 1000; + + const deduplicatedHandler = async (roomId: string, event: MatrixRawEvent, source: string) => { + const eventId = event?.event_id; + if (!eventId) { + logVerboseMessage(`matrix: ${source} event has no id, processing anyway`); + await onRoomMessage(roomId, event); + return; + } + + if (processedEvents.has(eventId)) { + logVerboseMessage(`matrix: ${source} skipping duplicate event id=${eventId}`); + return; + } + + processedEvents.add(eventId); + // Prevent memory leak by clearing old entries + if (processedEvents.size > PROCESSED_EVENTS_MAX) { + const iterator = processedEvents.values(); + for (let i = 0; i < 100; i++) { + const next = iterator.next(); + if (next.done) break; + processedEvents.delete(next.value); + } + } + + logVerboseMessage(`matrix: ${source} processing event id=${eventId} room=${roomId}`); + await onRoomMessage(roomId, event); + }; + + client.on("room.message", (roomId: string, event: MatrixRawEvent) => { + deduplicatedHandler(roomId, event, "room.message"); + }); client.on("room.encrypted_event", (roomId: string, event: MatrixRawEvent) => { const eventId = event?.event_id ?? "unknown"; @@ -36,12 +71,21 @@ export function registerMatrixMonitorEvents(params: { client.on("room.decrypted_event", (roomId: string, event: MatrixRawEvent) => { const eventId = event?.event_id ?? "unknown"; const eventType = event?.type ?? "unknown"; + const content = event?.content as Record | undefined; + const hasFile = content && "file" in content; + const hasUrl = content && "url" in content; + // DEBUG: Always log decrypted events with file info + console.log(`[MATRIX-E2EE-DEBUG] decrypted_event room=${roomId} type=${eventType} id=${eventId} hasFile=${hasFile} hasUrl=${hasUrl}`); logVerboseMessage(`matrix: decrypted event room=${roomId} type=${eventType} id=${eventId}`); + // Process decrypted messages through the deduplicated handler + deduplicatedHandler(roomId, event, "room.decrypted_event"); }); client.on( "room.failed_decryption", async (roomId: string, event: MatrixRawEvent, error: Error) => { + // DEBUG: Always log failed decryption + console.log(`[MATRIX-E2EE-DEBUG] FAILED_DECRYPTION room=${roomId} id=${event.event_id ?? "unknown"} error=${error.message}`); logger.warn( { roomId, eventId: event.event_id, error: error.message }, "Failed to decrypt message", @@ -83,7 +127,8 @@ export function registerMatrixMonitorEvents(params: { const hint = formatNativeDependencyHint({ packageName: "@matrix-org/matrix-sdk-crypto-nodejs", manager: "pnpm", - downloadCommand: "node node_modules/@matrix-org/matrix-sdk-crypto-nodejs/download-lib.js", + downloadCommand: + "node node_modules/@matrix-org/matrix-sdk-crypto-nodejs/download-lib.js", }); const warning = `matrix: encryption enabled but crypto is unavailable; ${hint}`; logger.warn({ roomId }, warning); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 6f45f5ed3..3a2872da5 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1,4 +1,14 @@ import type { LocationMessageEventContent, MatrixClient } from "@vector-im/matrix-bot-sdk"; +import fs from "node:fs"; + +// File-based debug logging +const DEBUG_LOG = "/home/keller/clawd/agents/mondo-assistant/matrix-debug.log"; +function debugWrite(msg: string) { + try { + fs.appendFileSync(DEBUG_LOG, `[${new Date().toISOString()}] ${msg}\n`); + } catch { /* ignore */ } +} + import { createReplyPrefixContext, createTypingCallbacks, @@ -9,30 +19,25 @@ import { type RuntimeEnv, } from "openclaw/plugin-sdk"; import type { CoreConfig, ReplyToMode } from "../../types.js"; -import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js"; import { formatPollAsText, isPollStartType, parsePollStartContent, type PollStartContent, } from "../poll-types.js"; -import { - reactMatrixMessage, - sendMessageMatrix, - sendReadReceiptMatrix, - sendTypingMatrix, -} from "../send.js"; +import { reactMatrixMessage, sendMessageMatrix, sendReadReceiptMatrix, sendTypingMatrix } from "../send.js"; import { resolveMatrixAllowListMatch, resolveMatrixAllowListMatches, normalizeAllowListLower, } from "./allowlist.js"; -import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js"; import { downloadMatrixMedia } from "./media.js"; import { resolveMentions } from "./mentions.js"; import { deliverMatrixReplies } from "./replies.js"; import { resolveMatrixRoomConfig } from "./rooms.js"; import { resolveMatrixThreadRootId, resolveMatrixThreadTarget } from "./threads.js"; +import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js"; +import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js"; import { EventType, RelationType } from "./types.js"; export type MatrixMonitorHandlerParams = { @@ -41,7 +46,7 @@ export type MatrixMonitorHandlerParams = { logging: { shouldLogVerbose: () => boolean; }; - channel: (typeof import("openclaw/plugin-sdk"))["channel"]; + channel: typeof import("openclaw/plugin-sdk")["channel"]; system: { enqueueSystemEvent: ( text: string, @@ -63,7 +68,7 @@ export type MatrixMonitorHandlerParams = { : Record | undefined : Record | undefined; mentionRegexes: ReturnType< - (typeof import("openclaw/plugin-sdk"))["channel"]["mentions"]["buildMentionRegexes"] + typeof import("openclaw/plugin-sdk")["channel"]["mentions"]["buildMentionRegexes"] >; groupPolicy: "open" | "allowlist" | "disabled"; replyToMode: ReplyToMode; @@ -81,10 +86,10 @@ export type MatrixMonitorHandlerParams = { selfUserId: string; }) => Promise; }; - getRoomInfo: ( - roomId: string, - ) => Promise<{ name?: string; canonicalAlias?: string; altAliases: string[] }>; + getRoomInfo: (roomId: string) => Promise<{ name?: string; canonicalAlias?: string; altAliases: string[] }>; getMemberDisplayName: (roomId: string, userId: string) => Promise; + /** Account ID for multi-account routing */ + accountId?: string | null; }; export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParams) { @@ -110,12 +115,15 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam directTracker, getRoomInfo, getMemberDisplayName, + accountId, } = params; return async (roomId: string, event: MatrixRawEvent) => { + debugWrite(`HANDLER-START: room=${roomId} eventId=${event.event_id ?? "unknown"} type=${event.type} sender=${event.sender} accountId=${accountId ?? "default"}`); try { const eventType = event.type; if (eventType === EventType.RoomMessageEncrypted) { + debugWrite(`HANDLER: SKIP encrypted event (should be auto-decrypted)`); // Encrypted messages are decrypted automatically by @vector-im/matrix-bot-sdk with crypto enabled return; } @@ -124,24 +132,17 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const locationContent = event.content as LocationMessageEventContent; const isLocationEvent = eventType === EventType.Location || - (eventType === EventType.RoomMessage && locationContent.msgtype === EventType.Location); - if (eventType !== EventType.RoomMessage && !isPollEvent && !isLocationEvent) { - return; - } + (eventType === EventType.RoomMessage && + locationContent.msgtype === EventType.Location); + if (eventType !== EventType.RoomMessage && !isPollEvent && !isLocationEvent) return; logVerboseMessage( `matrix: room.message recv room=${roomId} type=${eventType} id=${event.event_id ?? "unknown"}`, ); - if (event.unsigned?.redacted_because) { - return; - } + if (event.unsigned?.redacted_because) return; const senderId = event.sender; - if (!senderId) { - return; - } + if (!senderId) return; const selfUserId = await client.getUserId(); - if (senderId === selfUserId) { - return; - } + if (senderId === selfUserId) return; const eventTs = event.origin_server_ts; const eventAge = event.unsigned?.age; if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) { @@ -157,7 +158,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const roomInfo = await getRoomInfo(roomId); const roomName = roomInfo.name; - const roomAliases = [roomInfo.canonicalAlias ?? "", ...roomInfo.altAliases].filter(Boolean); + const roomAliases = [ + roomInfo.canonicalAlias ?? "", + ...roomInfo.altAliases, + ].filter(Boolean); let content = event.content as RoomMessageEventContent; if (isPollEvent) { @@ -186,9 +190,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const relates = content["m.relates_to"]; if (relates && "rel_type" in relates) { - if (relates.rel_type === RelationType.Replace) { - return; - } + if (relates.rel_type === RelationType.Replace) return; } const isDirectMessage = await directTracker.isDirectMessage({ @@ -198,9 +200,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }); const isRoom = !isDirectMessage; - if (isRoom && groupPolicy === "disabled") { - return; - } + if (isRoom && groupPolicy === "disabled") return; const roomConfigInfo = isRoom ? resolveMatrixRoomConfig({ @@ -233,9 +233,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } const senderName = await getMemberDisplayName(roomId, senderId); - const storeAllowFrom = await core.channel.pairing - .readAllowFromStore("matrix") - .catch(() => []); + const storeAllowFrom = await core.channel.pairing.readAllowFromStore("matrix").catch(() => []); const effectiveAllowFrom = normalizeAllowListLower([...allowFrom, ...storeAllowFrom]); const groupAllowFrom = cfg.channels?.matrix?.groupAllowFrom ?? []; const effectiveGroupAllowFrom = normalizeAllowListLower([ @@ -245,9 +243,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const groupAllowConfigured = effectiveGroupAllowFrom.length > 0; if (isDirectMessage) { - if (!dmEnabled || dmPolicy === "disabled") { - return; - } + if (!dmEnabled || dmPolicy === "disabled") return; if (dmPolicy !== "open") { const allowMatch = resolveMatrixAllowListMatch({ allowList: effectiveAllowFrom, @@ -329,8 +325,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam logVerboseMessage(`matrix: allow room ${roomId} (${roomMatchMeta})`); } - const rawBody = - locationPayload?.text ?? (typeof content.body === "string" ? content.body.trim() : ""); + const rawBody = locationPayload?.text + ?? (typeof content.body === "string" ? content.body.trim() : ""); let media: { path: string; contentType?: string; @@ -343,7 +339,14 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam ? content.file : undefined; const mediaUrl = contentUrl ?? contentFile?.url; + + // DEBUG: Log media detection + const msgtype = "msgtype" in content ? content.msgtype : undefined; + debugWrite(`HANDLER: room=${roomId} sender=${senderId} msgtype=${msgtype} contentUrl=${contentUrl ?? "none"} mediaUrl=${mediaUrl ?? "none"} accountId=${accountId ?? "default"}`); + logVerboseMessage(`matrix: content check msgtype=${msgtype} contentUrl=${contentUrl ?? "none"} mediaUrl=${mediaUrl ?? "none"} rawBody="${rawBody.slice(0,50)}"`); + if (!rawBody && !mediaUrl) { + debugWrite(`HANDLER: SKIP - no rawBody and no mediaUrl`); return; } @@ -352,8 +355,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam ? (content.info as { mimetype?: string; size?: number }) : undefined; const contentType = contentInfo?.mimetype; - const contentSize = typeof contentInfo?.size === "number" ? contentInfo.size : undefined; + const contentSize = + typeof contentInfo?.size === "number" ? contentInfo.size : undefined; if (mediaUrl?.startsWith("mxc://")) { + debugWrite(`HANDLER: attempting media download url=${mediaUrl} size=${contentSize ?? "unknown"} maxBytes=${mediaMaxBytes}`); + logVerboseMessage(`matrix: attempting media download url=${mediaUrl} size=${contentSize ?? "unknown"} maxBytes=${mediaMaxBytes}`); try { media = await downloadMatrixMedia({ client, @@ -363,15 +369,19 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam maxBytes: mediaMaxBytes, file: contentFile, }); + debugWrite(`HANDLER: media download SUCCESS path=${media?.path ?? "none"}`); + logVerboseMessage(`matrix: media download success path=${media?.path ?? "none"}`); } catch (err) { + debugWrite(`HANDLER: media download FAILED: ${String(err)}`); logVerboseMessage(`matrix: media download failed: ${String(err)}`); } + } else if (mediaUrl) { + debugWrite(`HANDLER: skipping non-mxc url=${mediaUrl}`); + logVerboseMessage(`matrix: skipping non-mxc media url=${mediaUrl}`); } const bodyText = rawBody || media?.placeholder || ""; - if (!bodyText) { - return; - } + if (!bodyText) return; const { wasMentioned, hasExplicitMention } = resolveMentions({ content, @@ -461,6 +471,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const route = core.channel.routing.resolveAgentRoute({ cfg, channel: "matrix", + accountId: accountId ?? undefined, peer: { kind: isDirectMessage ? "dm" : "channel", id: isDirectMessage ? senderId : roomId, @@ -512,7 +523,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam MediaPath: media?.path, MediaType: media?.contentType, MediaUrl: media?.path, - ...locationPayload?.context, + ...(locationPayload?.context ?? {}), CommandAuthorized: commandAuthorized, CommandSource: "text" as const, OriginatingChannel: "matrix" as const, @@ -533,11 +544,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam : undefined, onRecordError: (err) => { logger.warn( - { - error: String(err), - storePath, - sessionKey: ctxPayload.SessionKey ?? route.sessionKey, - }, + { error: String(err), storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey }, "failed updating session meta", ); }, @@ -551,19 +558,19 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam const shouldAckReaction = () => Boolean( ackReaction && - core.channel.reactions.shouldAckReaction({ - scope: ackScope, - isDirect: isDirectMessage, - isGroup: isRoom, - isMentionableGroup: isRoom, - requireMention: Boolean(shouldRequireMention), - canDetectMention, - effectiveWasMentioned: wasMentioned || shouldBypassMention, - shouldBypassMention, - }), + core.channel.reactions.shouldAckReaction({ + scope: ackScope, + isDirect: isDirectMessage, + isGroup: isRoom, + isMentionableGroup: isRoom, + requireMention: Boolean(shouldRequireMention), + canDetectMention, + effectiveWasMentioned: wasMentioned || shouldBypassMention, + shouldBypassMention, + }), ); if (shouldAckReaction() && messageId) { - reactMatrixMessage(roomId, messageId, ackReaction, client).catch((err) => { + reactMatrixMessage(roomId, messageId, ackReaction, { client }).catch((err) => { logVerboseMessage(`matrix react failed for room ${roomId}: ${String(err)}`); }); } @@ -648,9 +655,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }, }); markDispatchIdle(); - if (!queuedFinal) { - return; - } + if (!queuedFinal) return; didSendReply = true; const finalCount = counts.final; logVerboseMessage( diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 4ac87b251..5f0dca70f 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -1,8 +1,11 @@ import { format } from "node:util"; -import { mergeAllowlist, summarizeMapping, type RuntimeEnv } from "openclaw/plugin-sdk"; + +import { + mergeAllowlist, + summarizeMapping, + type RuntimeEnv, +} from "openclaw/plugin-sdk"; import type { CoreConfig, ReplyToMode } from "../../types.js"; -import { resolveMatrixTargets } from "../../resolve-targets.js"; -import { getMatrixRuntime } from "../../runtime.js"; import { setActiveMatrixClient } from "../active-client.js"; import { isBunRuntime, @@ -15,6 +18,8 @@ import { createDirectRoomTracker } from "./direct.js"; import { registerMatrixMonitorEvents } from "./events.js"; import { createMatrixRoomMessageHandler } from "./handler.js"; import { createMatrixRoomInfoResolver } from "./room-info.js"; +import { resolveMatrixTargets } from "../../resolve-targets.js"; +import { getMatrixRuntime } from "../../runtime.js"; export type MonitorMatrixOpts = { runtime?: RuntimeEnv; @@ -33,9 +38,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi } const core = getMatrixRuntime(); let cfg = core.config.loadConfig() as CoreConfig; - if (cfg.channels?.matrix?.enabled === false) { - return; - } + if (cfg.channels?.matrix?.enabled === false) return; const logger = core.logging.getChildLogger({ module: "matrix-auto-reply" }); const formatRuntimeMessage = (...args: Parameters) => format(...args); @@ -51,22 +54,14 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi }, }; const logVerboseMessage = (message: string) => { - if (!core.logging.shouldLogVerbose()) { - return; - } + if (!core.logging.shouldLogVerbose()) return; logger.debug(message); }; const normalizeUserEntry = (raw: string) => - raw - .replace(/^matrix:/i, "") - .replace(/^user:/i, "") - .trim(); + raw.replace(/^matrix:/i, "").replace(/^user:/i, "").trim(); const normalizeRoomEntry = (raw: string) => - raw - .replace(/^matrix:/i, "") - .replace(/^(room|channel):/i, "") - .trim(); + raw.replace(/^matrix:/i, "").replace(/^(room|channel):/i, "").trim(); const isMatrixUserId = (value: string) => value.startsWith("@") && value.includes(":"); const allowlistOnly = cfg.channels?.matrix?.allowlistOnly === true; @@ -118,9 +113,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const pending: Array<{ input: string; query: string }> = []; for (const entry of entries) { const trimmed = entry.trim(); - if (!trimmed) { - continue; - } + if (!trimmed) continue; const cleaned = normalizeRoomEntry(trimmed); if (cleaned.startsWith("!") && cleaned.includes(":")) { if (!nextRooms[cleaned]) { @@ -140,9 +133,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi }); resolved.forEach((entry, index) => { const source = pending[index]; - if (!source) { - return; - } + if (!source) return; if (entry.resolved && entry.id) { if (!nextRooms[entry.id]) { nextRooms[entry.id] = roomsConfig[source.input]; @@ -172,7 +163,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi }, }; - const auth = await resolveMatrixAuth({ cfg }); + const auth = await resolveMatrixAuth({ cfg, accountId: opts.accountId }); const resolvedInitialSyncLimit = typeof opts.initialSyncLimit === "number" ? Math.max(0, Math.floor(opts.initialSyncLimit)) @@ -187,7 +178,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi startClient: false, accountId: opts.accountId, }); - setActiveMatrixClient(client); + setActiveMatrixClient(client, opts.accountId); const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg); const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; @@ -232,6 +223,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi directTracker, getRoomInfo, getMemberDisplayName, + accountId: opts.accountId, }); registerMatrixMonitorEvents({ @@ -265,20 +257,17 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi logger.info("matrix: device verification requested - please verify in another client"); } } catch (err) { - logger.debug( - { error: String(err) }, - "Device verification request failed (may already be verified)", - ); + logger.debug({ error: String(err) }, "Device verification request failed (may already be verified)"); } } await new Promise((resolve) => { const onAbort = () => { try { - logVerboseMessage("matrix: stopping client"); - stopSharedClient(); + logVerboseMessage(`matrix: stopping client for account ${opts.accountId ?? "default"}`); + stopSharedClient(opts.accountId); } finally { - setActiveMatrixClient(null); + setActiveMatrixClient(null, opts.accountId); resolve(); } }; diff --git a/extensions/matrix/src/matrix/monitor/location.ts b/extensions/matrix/src/matrix/monitor/location.ts index 41c91aecc..8d7aecc13 100644 --- a/extensions/matrix/src/matrix/monitor/location.ts +++ b/extensions/matrix/src/matrix/monitor/location.ts @@ -1,4 +1,5 @@ import type { LocationMessageEventContent } from "@vector-im/matrix-bot-sdk"; + import { formatLocationText, toLocationContext, @@ -19,37 +20,25 @@ type GeoUriParams = { function parseGeoUri(value: string): GeoUriParams | null { const trimmed = value.trim(); - if (!trimmed) { - return null; - } - if (!trimmed.toLowerCase().startsWith("geo:")) { - return null; - } + if (!trimmed) return null; + if (!trimmed.toLowerCase().startsWith("geo:")) return null; const payload = trimmed.slice(4); const [coordsPart, ...paramParts] = payload.split(";"); const coords = coordsPart.split(","); - if (coords.length < 2) { - return null; - } + if (coords.length < 2) return null; const latitude = Number.parseFloat(coords[0] ?? ""); const longitude = Number.parseFloat(coords[1] ?? ""); - if (!Number.isFinite(latitude) || !Number.isFinite(longitude)) { - return null; - } + if (!Number.isFinite(latitude) || !Number.isFinite(longitude)) return null; const params = new Map(); for (const part of paramParts) { const segment = part.trim(); - if (!segment) { - continue; - } + if (!segment) continue; const eqIndex = segment.indexOf("="); const rawKey = eqIndex === -1 ? segment : segment.slice(0, eqIndex); const rawValue = eqIndex === -1 ? "" : segment.slice(eqIndex + 1); const key = rawKey.trim().toLowerCase(); - if (!key) { - continue; - } + if (!key) continue; const valuePart = rawValue.trim(); params.set(key, valuePart ? decodeURIComponent(valuePart) : ""); } @@ -72,17 +61,11 @@ export function resolveMatrixLocation(params: { const isLocation = eventType === EventType.Location || (eventType === EventType.RoomMessage && content.msgtype === EventType.Location); - if (!isLocation) { - return null; - } + if (!isLocation) return null; const geoUri = typeof content.geo_uri === "string" ? content.geo_uri.trim() : ""; - if (!geoUri) { - return null; - } + if (!geoUri) return null; const parsed = parseGeoUri(geoUri); - if (!parsed) { - return null; - } + if (!parsed) return null; const caption = typeof content.body === "string" ? content.body.trim() : ""; const location: NormalizedLocation = { latitude: parsed.latitude, diff --git a/extensions/matrix/src/matrix/monitor/media.test.ts b/extensions/matrix/src/matrix/monitor/media.test.ts index 590dd5148..eabbc0d4c 100644 --- a/extensions/matrix/src/matrix/monitor/media.test.ts +++ b/extensions/matrix/src/matrix/monitor/media.test.ts @@ -1,5 +1,6 @@ -import type { PluginRuntime } from "openclaw/plugin-sdk"; import { beforeEach, describe, expect, it, vi } from "vitest"; + +import type { PluginRuntime } from "openclaw/plugin-sdk"; import { setMatrixRuntime } from "../../runtime.js"; import { downloadMatrixMedia } from "./media.js"; diff --git a/extensions/matrix/src/matrix/monitor/media.ts b/extensions/matrix/src/matrix/monitor/media.ts index c88bfc061..f7a1d7cc0 100644 --- a/extensions/matrix/src/matrix/monitor/media.ts +++ b/extensions/matrix/src/matrix/monitor/media.ts @@ -1,4 +1,5 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; + import { getMatrixRuntime } from "../../runtime.js"; // Type for encrypted file info @@ -23,19 +24,21 @@ async function fetchMatrixMediaBuffer(params: { }): Promise<{ buffer: Buffer; headerType?: string } | null> { // @vector-im/matrix-bot-sdk provides mxcToHttp helper const url = params.client.mxcToHttp(params.mxcUrl); - if (!url) { - return null; - } + if (!url) return null; // Use the client's download method which handles auth try { - const buffer = await params.client.downloadContent(params.mxcUrl); + // downloadContent returns {data: Buffer, contentType: string} + const response = await params.client.downloadContent(params.mxcUrl); + const buffer = response.data; + const contentType = response.contentType; + if (buffer.byteLength > params.maxBytes) { throw new Error("Matrix media exceeds configured size limit"); } - return { buffer: Buffer.from(buffer) }; + return { buffer: Buffer.from(buffer), headerType: contentType }; } catch (err) { - throw new Error(`Matrix media download failed: ${String(err)}`, { cause: err }); + throw new Error(`Matrix media download failed: ${String(err)}`); } } @@ -75,7 +78,10 @@ export async function downloadMatrixMedia(params: { placeholder: string; } | null> { let fetched: { buffer: Buffer; headerType?: string } | null; - if (typeof params.sizeBytes === "number" && params.sizeBytes > params.maxBytes) { + if ( + typeof params.sizeBytes === "number" && + params.sizeBytes > params.maxBytes + ) { throw new Error("Matrix media exceeds configured size limit"); } @@ -95,9 +101,7 @@ export async function downloadMatrixMedia(params: { }); } - if (!fetched) { - return null; - } + if (!fetched) return null; const headerType = fetched.headerType ?? params.contentType ?? undefined; const saved = await getMatrixRuntime().channel.media.saveMediaBuffer( fetched.buffer, diff --git a/extensions/matrix/src/matrix/monitor/replies.ts b/extensions/matrix/src/matrix/monitor/replies.ts index 1193d59f8..a4640f3cc 100644 --- a/extensions/matrix/src/matrix/monitor/replies.ts +++ b/extensions/matrix/src/matrix/monitor/replies.ts @@ -1,7 +1,8 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; + import type { MarkdownTableMode, ReplyPayload, RuntimeEnv } from "openclaw/plugin-sdk"; -import { getMatrixRuntime } from "../../runtime.js"; import { sendMessageMatrix } from "../send.js"; +import { getMatrixRuntime } from "../../runtime.js"; export async function deliverMatrixReplies(params: { replies: ReplyPayload[]; @@ -61,9 +62,7 @@ export async function deliverMatrixReplies(params: { chunkMode, )) { const trimmed = chunk.trim(); - if (!trimmed) { - continue; - } + if (!trimmed) continue; await sendMessageMatrix(params.roomId, trimmed, { client: params.client, replyToId: shouldIncludeReply(replyToId) ? replyToId : undefined, diff --git a/extensions/matrix/src/matrix/monitor/room-info.ts b/extensions/matrix/src/matrix/monitor/room-info.ts index 764147d35..cad377e1a 100644 --- a/extensions/matrix/src/matrix/monitor/room-info.ts +++ b/extensions/matrix/src/matrix/monitor/room-info.ts @@ -11,14 +11,14 @@ export function createMatrixRoomInfoResolver(client: MatrixClient) { const getRoomInfo = async (roomId: string): Promise => { const cached = roomInfoCache.get(roomId); - if (cached) { - return cached; - } + if (cached) return cached; let name: string | undefined; let canonicalAlias: string | undefined; let altAliases: string[] = []; try { - const nameState = await client.getRoomStateEvent(roomId, "m.room.name", "").catch(() => null); + const nameState = await client + .getRoomStateEvent(roomId, "m.room.name", "") + .catch(() => null); name = nameState?.name; } catch { // ignore @@ -37,7 +37,10 @@ export function createMatrixRoomInfoResolver(client: MatrixClient) { return info; }; - const getMemberDisplayName = async (roomId: string, userId: string): Promise => { + const getMemberDisplayName = async ( + roomId: string, + userId: string, + ): Promise => { try { const memberState = await client .getRoomStateEvent(roomId, "m.room.member", userId) diff --git a/extensions/matrix/src/matrix/monitor/rooms.ts b/extensions/matrix/src/matrix/monitor/rooms.ts index ed705e837..086048a76 100644 --- a/extensions/matrix/src/matrix/monitor/rooms.ts +++ b/extensions/matrix/src/matrix/monitor/rooms.ts @@ -1,5 +1,5 @@ -import { buildChannelKeyCandidates, resolveChannelEntryMatch } from "openclaw/plugin-sdk"; import type { MatrixRoomConfig } from "../../types.js"; +import { buildChannelKeyCandidates, resolveChannelEntryMatch } from "openclaw/plugin-sdk"; export type MatrixRoomConfigResolved = { allowed: boolean; @@ -24,12 +24,7 @@ export function resolveMatrixRoomConfig(params: { ...params.aliases, params.name ?? "", ); - const { - entry: matched, - key: matchedKey, - wildcardEntry, - wildcardKey, - } = resolveChannelEntryMatch({ + const { entry: matched, key: matchedKey, wildcardEntry, wildcardKey } = resolveChannelEntryMatch({ entries: rooms, keys: candidates, wildcardKey: "*", diff --git a/extensions/matrix/src/matrix/monitor/threads.ts b/extensions/matrix/src/matrix/monitor/threads.ts index a38495716..4d618f329 100644 --- a/extensions/matrix/src/matrix/monitor/threads.ts +++ b/extensions/matrix/src/matrix/monitor/threads.ts @@ -28,9 +28,7 @@ export function resolveMatrixThreadTarget(params: { isThreadRoot?: boolean; }): string | undefined { const { threadReplies, messageId, threadRootId } = params; - if (threadReplies === "off") { - return undefined; - } + if (threadReplies === "off") return undefined; const isThreadRoot = params.isThreadRoot === true; const hasInboundThread = Boolean(threadRootId && threadRootId !== messageId && !isThreadRoot); if (threadReplies === "inbound") { @@ -47,9 +45,7 @@ export function resolveMatrixThreadRootId(params: { content: RoomMessageEventContent; }): string | undefined { const relates = params.content["m.relates_to"]; - if (!relates || typeof relates !== "object") { - return undefined; - } + if (!relates || typeof relates !== "object") return undefined; if ("rel_type" in relates && relates.rel_type === RelationType.Thread) { if ("event_id" in relates && typeof relates.event_id === "string") { return relates.event_id; diff --git a/extensions/matrix/src/matrix/poll-types.test.ts b/extensions/matrix/src/matrix/poll-types.test.ts index 7f1797d99..f2d885622 100644 --- a/extensions/matrix/src/matrix/poll-types.test.ts +++ b/extensions/matrix/src/matrix/poll-types.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "vitest"; + import { parsePollStartContent } from "./poll-types.js"; describe("parsePollStartContent", () => { diff --git a/extensions/matrix/src/matrix/poll-types.ts b/extensions/matrix/src/matrix/poll-types.ts index 29897d895..cd641ebb4 100644 --- a/extensions/matrix/src/matrix/poll-types.ts +++ b/extensions/matrix/src/matrix/poll-types.ts @@ -77,25 +77,18 @@ export function isPollStartType(eventType: string): boolean { } export function getTextContent(text?: TextContent): string { - if (!text) { - return ""; - } + if (!text) return ""; return text["m.text"] ?? text["org.matrix.msc1767.text"] ?? text.body ?? ""; } export function parsePollStartContent(content: PollStartContent): PollSummary | null { - const poll = - (content as Record)[M_POLL_START] ?? - (content as Record)[ORG_POLL_START] ?? - (content as Record)["m.poll"]; - if (!poll) { - return null; - } + const poll = (content as Record)[M_POLL_START] + ?? (content as Record)[ORG_POLL_START] + ?? (content as Record)["m.poll"]; + if (!poll) return null; const question = getTextContent(poll.question); - if (!question) { - return null; - } + if (!question) return null; const answers = poll.answers .map((answer) => getTextContent(answer)) @@ -131,9 +124,7 @@ function buildTextContent(body: string): TextContent { } function buildPollFallbackText(question: string, answers: string[]): string { - if (answers.length === 0) { - return question; - } + if (answers.length === 0) return question; return `${question}\n${answers.map((answer, idx) => `${idx + 1}. ${answer}`).join("\n")}`; } diff --git a/extensions/matrix/src/matrix/send.test.ts b/extensions/matrix/src/matrix/send.test.ts index 0ebfc826f..2bba70e6c 100644 --- a/extensions/matrix/src/matrix/send.test.ts +++ b/extensions/matrix/src/matrix/send.test.ts @@ -1,5 +1,6 @@ -import type { PluginRuntime } from "openclaw/plugin-sdk"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; + +import type { PluginRuntime } from "openclaw/plugin-sdk"; import { setMatrixRuntime } from "../runtime.js"; vi.mock("@vector-im/matrix-bot-sdk", () => ({ diff --git a/extensions/matrix/src/matrix/send.ts b/extensions/matrix/src/matrix/send.ts index b9bfae4fe..55c2293e8 100644 --- a/extensions/matrix/src/matrix/send.ts +++ b/extensions/matrix/src/matrix/send.ts @@ -1,4 +1,5 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; + import type { PollInput } from "openclaw/plugin-sdk"; import { getMatrixRuntime } from "../runtime.js"; import { buildPollStartContent, M_POLL_START } from "./poll-types.js"; @@ -45,6 +46,7 @@ export async function sendMessageMatrix( const { client, stopOnDone } = await resolveMatrixClient({ client: opts.client, timeoutMs: opts.timeoutMs, + accountId: opts.accountId, }); try { const roomId = await resolveMatrixRoomId(client, to); @@ -122,9 +124,7 @@ export async function sendMessageMatrix( const followupRelation = threadId ? relation : undefined; for (const chunk of textChunks) { const text = chunk.trim(); - if (!text) { - continue; - } + if (!text) continue; const followup = buildTextContent(text, followupRelation); const followupEventId = await sendContent(followup); lastMessageId = followupEventId ?? lastMessageId; @@ -132,9 +132,7 @@ export async function sendMessageMatrix( } else { for (const chunk of chunks.length ? chunks : [""]) { const text = chunk.trim(); - if (!text) { - continue; - } + if (!text) continue; const content = buildTextContent(text, relation); const eventId = await sendContent(content); lastMessageId = eventId ?? lastMessageId; @@ -214,9 +212,7 @@ export async function sendReadReceiptMatrix( eventId: string, client?: MatrixClient, ): Promise { - if (!eventId?.trim()) { - return; - } + if (!eventId?.trim()) return; const { client: resolved, stopOnDone } = await resolveMatrixClient({ client, }); @@ -234,13 +230,14 @@ export async function reactMatrixMessage( roomId: string, messageId: string, emoji: string, - client?: MatrixClient, + opts: { client?: MatrixClient; accountId?: string | null } = {}, ): Promise { if (!emoji.trim()) { throw new Error("Matrix reaction requires an emoji"); } const { client: resolved, stopOnDone } = await resolveMatrixClient({ - client, + client: opts.client, + accountId: opts.accountId, }); try { const resolvedRoom = await resolveMatrixRoomId(resolved, roomId); diff --git a/extensions/matrix/src/matrix/send/client.ts b/extensions/matrix/src/matrix/send/client.ts index aa0f3badb..0cc745608 100644 --- a/extensions/matrix/src/matrix/send/client.ts +++ b/extensions/matrix/src/matrix/send/client.ts @@ -1,5 +1,5 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; -import type { CoreConfig } from "../types.js"; + import { getMatrixRuntime } from "../../runtime.js"; import { getActiveMatrixClient } from "../active-client.js"; import { @@ -8,6 +8,7 @@ import { resolveMatrixAuth, resolveSharedMatrixClient, } from "../client.js"; +import type { CoreConfig } from "../types.js"; const getCore = () => getMatrixRuntime(); @@ -28,29 +29,31 @@ export function resolveMediaMaxBytes(): number | undefined { export async function resolveMatrixClient(opts: { client?: MatrixClient; timeoutMs?: number; + accountId?: string | null; }): Promise<{ client: MatrixClient; stopOnDone: boolean }> { ensureNodeRuntime(); - if (opts.client) { - return { client: opts.client, stopOnDone: false }; - } - const active = getActiveMatrixClient(); - if (active) { - return { client: active, stopOnDone: false }; - } + if (opts.client) return { client: opts.client, stopOnDone: false }; + + // Try to get the active client for the specified account + const active = getActiveMatrixClient(opts.accountId); + if (active) return { client: active, stopOnDone: false }; + const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT); if (shouldShareClient) { const client = await resolveSharedMatrixClient({ timeoutMs: opts.timeoutMs, + accountId: opts.accountId, }); return { client, stopOnDone: false }; } - const auth = await resolveMatrixAuth(); + const auth = await resolveMatrixAuth({ accountId: opts.accountId ?? undefined }); const client = await createMatrixClient({ homeserver: auth.homeserver, userId: auth.userId, accessToken: auth.accessToken, encryption: auth.encryption, localTimeoutMs: opts.timeoutMs, + accountId: opts.accountId ?? undefined, }); if (auth.encryption && client.crypto) { try { diff --git a/extensions/matrix/src/matrix/send/formatting.ts b/extensions/matrix/src/matrix/send/formatting.ts index 3189d1e90..ef2edeaf4 100644 --- a/extensions/matrix/src/matrix/send/formatting.ts +++ b/extensions/matrix/src/matrix/send/formatting.ts @@ -1,5 +1,5 @@ -import { getMatrixRuntime } from "../../runtime.js"; import { markdownToMatrixHtml } from "../format.js"; +import { getMatrixRuntime } from "../../runtime.js"; import { MsgType, RelationType, @@ -13,7 +13,10 @@ import { const getCore = () => getMatrixRuntime(); -export function buildTextContent(body: string, relation?: MatrixRelation): MatrixTextContent { +export function buildTextContent( + body: string, + relation?: MatrixRelation, +): MatrixTextContent { const content: MatrixTextContent = relation ? { msgtype: MsgType.Text, @@ -30,32 +33,34 @@ export function buildTextContent(body: string, relation?: MatrixRelation): Matri export function applyMatrixFormatting(content: MatrixFormattedContent, body: string): void { const formatted = markdownToMatrixHtml(body ?? ""); - if (!formatted) { - return; - } + if (!formatted) return; content.format = "org.matrix.custom.html"; content.formatted_body = formatted; } export function buildReplyRelation(replyToId?: string): MatrixReplyRelation | undefined { const trimmed = replyToId?.trim(); - if (!trimmed) { - return undefined; - } + if (!trimmed) return undefined; return { "m.in_reply_to": { event_id: trimmed } }; } -export function buildThreadRelation(threadId: string, replyToId?: string): MatrixThreadRelation { +export function buildThreadRelation( + threadId: string, + replyToId?: string, +): MatrixThreadRelation { const trimmed = threadId.trim(); return { rel_type: RelationType.Thread, event_id: trimmed, is_falling_back: true, - "m.in_reply_to": { event_id: replyToId?.trim() || trimmed }, + "m.in_reply_to": { event_id: (replyToId?.trim() || trimmed) }, }; } -export function resolveMatrixMsgType(contentType?: string, _fileName?: string): MatrixMediaMsgType { +export function resolveMatrixMsgType( + contentType?: string, + _fileName?: string, +): MatrixMediaMsgType { const kind = getCore().media.mediaKindFromMime(contentType ?? ""); switch (kind) { case "image": @@ -74,9 +79,7 @@ export function resolveMatrixVoiceDecision(opts: { contentType?: string; fileName?: string; }): { useVoice: boolean } { - if (!opts.wantsVoice) { - return { useVoice: false }; - } + if (!opts.wantsVoice) return { useVoice: false }; if ( getCore().media.isVoiceCompatibleAudio({ contentType: opts.contentType, diff --git a/extensions/matrix/src/matrix/send/media.ts b/extensions/matrix/src/matrix/send/media.ts index c4339d900..8c564bddb 100644 --- a/extensions/matrix/src/matrix/send/media.ts +++ b/extensions/matrix/src/matrix/send/media.ts @@ -7,8 +7,8 @@ import type { VideoFileInfo, } from "@vector-im/matrix-bot-sdk"; import { parseBuffer, type IFileInfo } from "music-metadata"; + import { getMatrixRuntime } from "../../runtime.js"; -import { applyMatrixFormatting } from "./formatting.js"; import { type MatrixMediaContent, type MatrixMediaInfo, @@ -16,6 +16,7 @@ import { type MatrixRelation, type MediaKind, } from "./types.js"; +import { applyMatrixFormatting } from "./formatting.js"; const getCore = () => getMatrixRuntime(); @@ -53,9 +54,7 @@ export function buildMatrixMediaInfo(params: { }; return timedInfo; } - if (Object.keys(base).length === 0) { - return undefined; - } + if (Object.keys(base).length === 0) return undefined; return base; } @@ -114,12 +113,8 @@ export async function prepareImageInfo(params: { buffer: Buffer; client: MatrixClient; }): Promise { - const meta = await getCore() - .media.getImageMetadata(params.buffer) - .catch(() => null); - if (!meta) { - return undefined; - } + const meta = await getCore().media.getImageMetadata(params.buffer).catch(() => null); + if (!meta) return undefined; const imageInfo: DimensionalFileInfo = { w: meta.width, h: meta.height }; const maxDim = Math.max(meta.width, meta.height); if (maxDim > THUMBNAIL_MAX_SIDE) { @@ -130,9 +125,7 @@ export async function prepareImageInfo(params: { quality: THUMBNAIL_QUALITY, withoutEnlargement: true, }); - const thumbMeta = await getCore() - .media.getImageMetadata(thumbBuffer) - .catch(() => null); + const thumbMeta = await getCore().media.getImageMetadata(thumbBuffer).catch(() => null); const thumbUri = await params.client.uploadContent( thumbBuffer, "image/jpeg", @@ -160,9 +153,7 @@ export async function resolveMediaDurationMs(params: { fileName?: string; kind: MediaKind; }): Promise { - if (params.kind !== "audio" && params.kind !== "video") { - return undefined; - } + if (params.kind !== "audio" && params.kind !== "video") return undefined; try { const fileInfo: IFileInfo | string | undefined = params.contentType || params.fileName @@ -210,7 +201,7 @@ export async function uploadMediaMaybeEncrypted( }, ): Promise<{ url: string; file?: EncryptedFile }> { // Check if room is encrypted and crypto is available - const isEncrypted = client.crypto && (await client.crypto.isRoomEncrypted(roomId)); + const isEncrypted = client.crypto && await client.crypto.isRoomEncrypted(roomId); if (isEncrypted && client.crypto) { // Encrypt the media before uploading diff --git a/extensions/matrix/src/matrix/send/targets.test.ts b/extensions/matrix/src/matrix/send/targets.test.ts index 0bc90327c..7173b1cf6 100644 --- a/extensions/matrix/src/matrix/send/targets.test.ts +++ b/extensions/matrix/src/matrix/send/targets.test.ts @@ -1,5 +1,6 @@ -import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import { beforeEach, describe, expect, it, vi } from "vitest"; + +import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import { EventType } from "./types.js"; let resolveMatrixRoomId: typeof import("./targets.js").resolveMatrixRoomId; @@ -25,9 +26,7 @@ describe("resolveMatrixRoomId", () => { const roomId = await resolveMatrixRoomId(client, userId); expect(roomId).toBe("!room:example.org"); - // oxlint-disable-next-line typescript/unbound-method expect(client.getJoinedRooms).not.toHaveBeenCalled(); - // oxlint-disable-next-line typescript/unbound-method expect(client.setAccountData).not.toHaveBeenCalled(); }); @@ -38,7 +37,10 @@ describe("resolveMatrixRoomId", () => { const client = { getAccountData: vi.fn().mockRejectedValue(new Error("nope")), getJoinedRooms: vi.fn().mockResolvedValue([roomId]), - getJoinedRoomMembers: vi.fn().mockResolvedValue(["@bot:example.org", userId]), + getJoinedRoomMembers: vi.fn().mockResolvedValue([ + "@bot:example.org", + userId, + ]), setAccountData, } as unknown as MatrixClient; @@ -78,9 +80,11 @@ describe("resolveMatrixRoomId", () => { const client = { getAccountData: vi.fn().mockRejectedValue(new Error("nope")), getJoinedRooms: vi.fn().mockResolvedValue([roomId]), - getJoinedRoomMembers: vi - .fn() - .mockResolvedValue(["@bot:example.org", userId, "@extra:example.org"]), + getJoinedRoomMembers: vi.fn().mockResolvedValue([ + "@bot:example.org", + userId, + "@extra:example.org", + ]), setAccountData: vi.fn().mockResolvedValue(undefined), } as unknown as MatrixClient; diff --git a/extensions/matrix/src/matrix/send/targets.ts b/extensions/matrix/src/matrix/send/targets.ts index b3de224eb..6ec6ad6d7 100644 --- a/extensions/matrix/src/matrix/send/targets.ts +++ b/extensions/matrix/src/matrix/send/targets.ts @@ -1,4 +1,5 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; + import { EventType, type MatrixDirectAccountData } from "./types.js"; function normalizeTarget(raw: string): string { @@ -10,9 +11,7 @@ function normalizeTarget(raw: string): string { } export function normalizeThreadId(raw?: string | number | null): string | null { - if (raw === undefined || raw === null) { - return null; - } + if (raw === undefined || raw === null) return null; const trimmed = String(raw).trim(); return trimmed ? trimmed : null; } @@ -26,15 +25,16 @@ async function persistDirectRoom( ): Promise { let directContent: MatrixDirectAccountData | null = null; try { - directContent = await client.getAccountData(EventType.Direct); + directContent = (await client.getAccountData( + EventType.Direct, + )) as MatrixDirectAccountData | null; } catch { // Ignore fetch errors and fall back to an empty map. } - const existing = directContent && !Array.isArray(directContent) ? directContent : {}; + const existing = + directContent && !Array.isArray(directContent) ? directContent : {}; const current = Array.isArray(existing[userId]) ? existing[userId] : []; - if (current[0] === roomId) { - return; - } + if (current[0] === roomId) return; const next = [roomId, ...current.filter((id) => id !== roomId)]; try { await client.setAccountData(EventType.Direct, { @@ -46,21 +46,28 @@ async function persistDirectRoom( } } -async function resolveDirectRoomId(client: MatrixClient, userId: string): Promise { +async function resolveDirectRoomId( + client: MatrixClient, + userId: string, +): Promise { const trimmed = userId.trim(); if (!trimmed.startsWith("@")) { - throw new Error(`Matrix user IDs must be fully qualified (got "${trimmed}")`); + throw new Error( + `Matrix user IDs must be fully qualified (got "${trimmed}")`, + ); } const cached = directRoomCache.get(trimmed); - if (cached) { - return cached; - } + if (cached) return cached; // 1) Fast path: use account data (m.direct) for *this* logged-in user (the bot). try { - const directContent = await client.getAccountData(EventType.Direct); - const list = Array.isArray(directContent?.[trimmed]) ? directContent[trimmed] : []; + const directContent = (await client.getAccountData( + EventType.Direct, + )) as MatrixDirectAccountData | null; + const list = Array.isArray(directContent?.[trimmed]) + ? directContent[trimmed] + : []; if (list.length > 0) { directRoomCache.set(trimmed, list[0]); return list[0]; @@ -81,9 +88,7 @@ async function resolveDirectRoomId(client: MatrixClient, userId: string): Promis } catch { continue; } - if (!members.includes(trimmed)) { - continue; - } + if (!members.includes(trimmed)) continue; // Prefer classic 1:1 rooms, but allow larger rooms if requested. if (members.length === 2) { directRoomCache.set(trimmed, roomId); @@ -107,7 +112,10 @@ async function resolveDirectRoomId(client: MatrixClient, userId: string): Promis throw new Error(`No direct room found for ${trimmed} (m.direct missing)`); } -export async function resolveMatrixRoomId(client: MatrixClient, raw: string): Promise { +export async function resolveMatrixRoomId( + client: MatrixClient, + raw: string, +): Promise { const target = normalizeTarget(raw); const lowered = target.toLowerCase(); if (lowered.startsWith("matrix:")) { diff --git a/extensions/matrix/src/onboarding.ts b/extensions/matrix/src/onboarding.ts index c85f3a25a..dbd4e6027 100644 --- a/extensions/matrix/src/onboarding.ts +++ b/extensions/matrix/src/onboarding.ts @@ -6,17 +6,16 @@ import { type ChannelOnboardingDmPolicy, type WizardPrompter, } from "openclaw/plugin-sdk"; -import type { CoreConfig, DmPolicy } from "./types.js"; import { listMatrixDirectoryGroupsLive } from "./directory-live.js"; import { listMatrixDirectoryPeersLive } from "./directory-live.js"; import { resolveMatrixAccount } from "./matrix/accounts.js"; import { ensureMatrixSdkInstalled, isMatrixSdkAvailable } from "./matrix/deps.js"; +import type { CoreConfig, DmPolicy } from "./types.js"; const channel = "matrix" as const; function setMatrixDmPolicy(cfg: CoreConfig, policy: DmPolicy) { - const allowFrom = - policy === "open" ? addWildcardAllowFrom(cfg.channels?.matrix?.dm?.allowFrom) : undefined; + const allowFrom = policy === "open" ? addWildcardAllowFrom(cfg.channels?.matrix?.dm?.allowFrom) : undefined; return { ...cfg, channels: { @@ -249,12 +248,8 @@ export const matrixOnboardingAdapter: ChannelOnboardingAdapter = { initialValue: existing.homeserver ?? envHomeserver, validate: (value) => { const raw = String(value ?? "").trim(); - if (!raw) { - return "Required"; - } - if (!/^https?:\/\//i.test(raw)) { - return "Use a full URL (https://...)"; - } + if (!raw) return "Required"; + if (!/^https?:\/\//i.test(raw)) return "Use a full URL (https://...)"; return undefined; }, }), @@ -278,13 +273,13 @@ export const matrixOnboardingAdapter: ChannelOnboardingAdapter = { if (!accessToken && !password) { // Ask auth method FIRST before asking for user ID - const authMode = await prompter.select({ + const authMode = (await prompter.select({ message: "Matrix auth method", options: [ { value: "token", label: "Access token (user ID fetched automatically)" }, { value: "password", label: "Password (requires user ID)" }, ], - }); + })) as "token" | "password"; if (authMode === "token") { accessToken = String( @@ -304,15 +299,9 @@ export const matrixOnboardingAdapter: ChannelOnboardingAdapter = { initialValue: existing.userId ?? envUserId, validate: (value) => { const raw = String(value ?? "").trim(); - if (!raw) { - return "Required"; - } - if (!raw.startsWith("@")) { - return "Matrix user IDs should start with @"; - } - if (!raw.includes(":")) { - return "Matrix user IDs should include a server (:server)"; - } + if (!raw) return "Required"; + if (!raw.startsWith("@")) return "Matrix user IDs should start with @"; + if (!raw.includes(":")) return "Matrix user IDs should include a server (:server)"; return undefined; }, }), @@ -380,9 +369,7 @@ export const matrixOnboardingAdapter: ChannelOnboardingAdapter = { const unresolved: string[] = []; for (const entry of accessConfig.entries) { const trimmed = entry.trim(); - if (!trimmed) { - continue; - } + if (!trimmed) continue; const cleaned = trimmed.replace(/^(room|channel):/i, "").trim(); if (cleaned.startsWith("!") && cleaned.includes(":")) { resolvedIds.push(cleaned); @@ -403,7 +390,10 @@ export const matrixOnboardingAdapter: ChannelOnboardingAdapter = { unresolved.push(entry); } } - roomKeys = [...resolvedIds, ...unresolved.map((entry) => entry.trim()).filter(Boolean)]; + roomKeys = [ + ...resolvedIds, + ...unresolved.map((entry) => entry.trim()).filter(Boolean), + ]; if (resolvedIds.length > 0 || unresolved.length > 0) { await prompter.note( [ diff --git a/extensions/matrix/src/outbound.ts b/extensions/matrix/src/outbound.ts index 86e660e66..91a6ced80 100644 --- a/extensions/matrix/src/outbound.ts +++ b/extensions/matrix/src/outbound.ts @@ -1,6 +1,7 @@ import type { ChannelOutboundAdapter } from "openclaw/plugin-sdk"; -import { sendMessageMatrix, sendPollMatrix } from "./matrix/send.js"; + import { getMatrixRuntime } from "./runtime.js"; +import { sendMessageMatrix, sendPollMatrix } from "./matrix/send.js"; export const matrixOutbound: ChannelOutboundAdapter = { deliveryMode: "direct", diff --git a/extensions/matrix/src/resolve-targets.ts b/extensions/matrix/src/resolve-targets.ts index a184247e1..4e776da53 100644 --- a/extensions/matrix/src/resolve-targets.ts +++ b/extensions/matrix/src/resolve-targets.ts @@ -4,15 +4,17 @@ import type { ChannelResolveResult, RuntimeEnv, } from "openclaw/plugin-sdk"; -import { listMatrixDirectoryGroupsLive, listMatrixDirectoryPeersLive } from "./directory-live.js"; + +import { + listMatrixDirectoryGroupsLive, + listMatrixDirectoryPeersLive, +} from "./directory-live.js"; function pickBestGroupMatch( matches: ChannelDirectoryEntry[], query: string, ): ChannelDirectoryEntry | undefined { - if (matches.length === 0) { - return undefined; - } + if (matches.length === 0) return undefined; const normalized = query.trim().toLowerCase(); if (normalized) { const exact = matches.find((match) => { @@ -21,9 +23,7 @@ function pickBestGroupMatch( const id = match.id.trim().toLowerCase(); return name === normalized || handle === normalized || id === normalized; }); - if (exact) { - return exact; - } + if (exact) return exact; } return matches[0]; } diff --git a/extensions/matrix/src/tool-actions.ts b/extensions/matrix/src/tool-actions.ts index 83ccecd7a..eee9a6e45 100644 --- a/extensions/matrix/src/tool-actions.ts +++ b/extensions/matrix/src/tool-actions.ts @@ -1,11 +1,5 @@ import type { AgentToolResult } from "@mariozechner/pi-agent-core"; -import { - createActionGate, - jsonResult, - readNumberParam, - readReactionParams, - readStringParam, -} from "openclaw/plugin-sdk"; + import type { CoreConfig } from "./types.js"; import { deleteMatrixMessage, @@ -21,6 +15,13 @@ import { unpinMatrixMessage, } from "./matrix/actions.js"; import { reactMatrixMessage } from "./matrix/send.js"; +import { + createActionGate, + jsonResult, + readNumberParam, + readReactionParams, + readStringParam, +} from "openclaw/plugin-sdk"; const messageActions = new Set(["sendMessage", "editMessage", "deleteMessage", "readMessages"]); const reactionActions = new Set(["react", "reactions"]); @@ -28,12 +29,8 @@ const pinActions = new Set(["pinMessage", "unpinMessage", "listPins"]); function readRoomId(params: Record, required = true): string { const direct = readStringParam(params, "roomId") ?? readStringParam(params, "channelId"); - if (direct) { - return direct; - } - if (!required) { - return readStringParam(params, "to") ?? ""; - } + if (direct) return direct; + if (!required) return readStringParam(params, "to") ?? ""; return readStringParam(params, "to", { required: true }); } @@ -42,6 +39,7 @@ export async function handleMatrixAction( cfg: CoreConfig, ): Promise> { const action = readStringParam(params, "action", { required: true }); + const accountId = readStringParam(params, "accountId") ?? undefined; const isActionEnabled = createActionGate(cfg.channels?.matrix?.actions); if (reactionActions.has(action)) { @@ -57,13 +55,14 @@ export async function handleMatrixAction( if (remove || isEmpty) { const result = await removeMatrixReactions(roomId, messageId, { emoji: remove ? emoji : undefined, + accountId, }); return jsonResult({ ok: true, removed: result.removed }); } - await reactMatrixMessage(roomId, messageId, emoji); + await reactMatrixMessage(roomId, messageId, emoji, { accountId }); return jsonResult({ ok: true, added: emoji }); } - const reactions = await listMatrixReactions(roomId, messageId); + const reactions = await listMatrixReactions(roomId, messageId, { accountId }); return jsonResult({ ok: true, reactions }); } @@ -79,13 +78,13 @@ export async function handleMatrixAction( allowEmpty: true, }); const mediaUrl = readStringParam(params, "mediaUrl"); - const replyToId = - readStringParam(params, "replyToId") ?? readStringParam(params, "replyTo"); + const replyToId = readStringParam(params, "replyToId") ?? readStringParam(params, "replyTo"); const threadId = readStringParam(params, "threadId"); const result = await sendMatrixMessage(to, content, { mediaUrl: mediaUrl ?? undefined, replyToId: replyToId ?? undefined, threadId: threadId ?? undefined, + accountId, }); return jsonResult({ ok: true, result }); } @@ -93,14 +92,14 @@ export async function handleMatrixAction( const roomId = readRoomId(params); const messageId = readStringParam(params, "messageId", { required: true }); const content = readStringParam(params, "content", { required: true }); - const result = await editMatrixMessage(roomId, messageId, content); + const result = await editMatrixMessage(roomId, messageId, content, { accountId }); return jsonResult({ ok: true, result }); } case "deleteMessage": { const roomId = readRoomId(params); const messageId = readStringParam(params, "messageId", { required: true }); const reason = readStringParam(params, "reason"); - await deleteMatrixMessage(roomId, messageId, { reason: reason ?? undefined }); + await deleteMatrixMessage(roomId, messageId, { reason: reason ?? undefined, accountId }); return jsonResult({ ok: true, deleted: true }); } case "readMessages": { @@ -112,6 +111,7 @@ export async function handleMatrixAction( limit: limit ?? undefined, before: before ?? undefined, after: after ?? undefined, + accountId, }); return jsonResult({ ok: true, ...result }); } @@ -127,15 +127,15 @@ export async function handleMatrixAction( const roomId = readRoomId(params); if (action === "pinMessage") { const messageId = readStringParam(params, "messageId", { required: true }); - const result = await pinMatrixMessage(roomId, messageId); + const result = await pinMatrixMessage(roomId, messageId, { accountId }); return jsonResult({ ok: true, pinned: result.pinned }); } if (action === "unpinMessage") { const messageId = readStringParam(params, "messageId", { required: true }); - const result = await unpinMatrixMessage(roomId, messageId); + const result = await unpinMatrixMessage(roomId, messageId, { accountId }); return jsonResult({ ok: true, pinned: result.pinned }); } - const result = await listMatrixPins(roomId); + const result = await listMatrixPins(roomId, { accountId }); return jsonResult({ ok: true, pinned: result.pinned, events: result.events }); } @@ -147,6 +147,7 @@ export async function handleMatrixAction( const roomId = readStringParam(params, "roomId") ?? readStringParam(params, "channelId"); const result = await getMatrixMemberInfo(userId, { roomId: roomId ?? undefined, + accountId, }); return jsonResult({ ok: true, member: result }); } @@ -156,7 +157,7 @@ export async function handleMatrixAction( throw new Error("Matrix room info is disabled."); } const roomId = readRoomId(params); - const result = await getMatrixRoomInfo(roomId); + const result = await getMatrixRoomInfo(roomId, { accountId }); return jsonResult({ ok: true, room: result }); } diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts index f03734130..17ff35a7a 100644 --- a/extensions/matrix/src/types.ts +++ b/extensions/matrix/src/types.ts @@ -38,10 +38,10 @@ export type MatrixActionConfig = { channelInfo?: boolean; }; -export type MatrixConfig = { +export type MatrixAccountConfig = { /** Optional display name for this account (used in CLI/UI lists). */ name?: string; - /** If false, do not start Matrix. Default: true. */ + /** If false, do not start this account. Default: true. */ enabled?: boolean; /** Matrix homeserver URL (https://matrix.example.org). */ homeserver?: string; @@ -87,9 +87,22 @@ export type MatrixConfig = { actions?: MatrixActionConfig; }; +export type MatrixConfig = { + /** Optional per-account Matrix configuration (multi-account). */ + accounts?: Record; +} & MatrixAccountConfig; + export type CoreConfig = { channels?: { matrix?: MatrixConfig; }; + bindings?: Array<{ + agentId?: string; + match?: { + channel?: string; + accountId?: string; + peer?: { kind?: string; id?: string }; + }; + }>; [key: string]: unknown; }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 124e39669..13170b23a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -129,6 +129,9 @@ importers: markdown-it: specifier: ^14.1.0 version: 14.1.0 + nats: + specifier: ^2.19.0 + version: 2.29.3 node-edge-tts: specifier: ^1.2.9 version: 1.2.9 @@ -317,7 +320,7 @@ importers: devDependencies: openclaw: specifier: workspace:* - version: link:../.. + version: 2026.2.1(@napi-rs/canvas@0.1.89)(@types/express@5.0.6)(audio-decode@2.2.3)(node-llama-cpp@3.15.1(typescript@5.9.3))(signal-polyfill@0.2.2) extensions/imessage: devDependencies: @@ -343,7 +346,7 @@ importers: specifier: workspace:* version: link:../.. - extensions/matrix: + extensions/matrix.upstream-disabled: dependencies: '@matrix-org/matrix-sdk-crypto-nodejs': specifier: ^0.4.0 @@ -375,7 +378,7 @@ importers: devDependencies: openclaw: specifier: workspace:* - version: link:../.. + version: 2026.2.1(@napi-rs/canvas@0.1.89)(@types/express@5.0.6)(audio-decode@2.2.3)(node-llama-cpp@3.15.1(typescript@5.9.3))(signal-polyfill@0.2.2) extensions/memory-lancedb: dependencies: @@ -4212,6 +4215,10 @@ packages: engines: {node: ^18 || >=20} hasBin: true + nats@2.29.3: + resolution: {integrity: sha512-tOQCRCwC74DgBTk4pWZ9V45sk4d7peoE2njVprMRCBXrhJ5q5cYM7i6W+Uvw2qUrcfOSnuisrX7bEx3b3Wx4QA==} + engines: {node: '>= 14.0.0'} + negotiator@0.6.3: resolution: {integrity: sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==} engines: {node: '>= 0.6'} @@ -4224,6 +4231,10 @@ packages: resolution: {integrity: sha512-dBpDMdxv9Irdq66304OLfEmQ9tbNRFnFTuZiLo+bD+r332bBmMJ8GBLXklIXXgxd3+v9+KUnZaUR5PJMa75Gsg==} engines: {node: '>= 0.4.0'} + nkeys.js@1.1.0: + resolution: {integrity: sha512-tB/a0shZL5UZWSwsoeyqfTszONTt4k2YS0tuQioMOD180+MbombYVgzDUYHlx+gejYK6rgf08n/2Df99WY0Sxg==} + engines: {node: '>=10.0.0'} + node-addon-api@8.5.0: resolution: {integrity: sha512-/bRZty2mXUIFY/xU5HLvveNHlswNJej+RnxBjOMkidWfwZzgTbPG1E3K5TOxRLOR+5hX7bSofy8yf1hZevMS8A==} engines: {node: ^18 || ^20 || >= 21} @@ -4369,6 +4380,14 @@ packages: zod: optional: true + openclaw@2026.2.1: + resolution: {integrity: sha512-SCGnsg/E9XPpYd1KCH+hvfQFTg+RLptBAAPbc+9e7PHn7aNzte7mcm+2W/kxn71Aie8jqwbZgWx9JdEPneiaLQ==} + engines: {node: '>=22.12.0'} + hasBin: true + peerDependencies: + '@napi-rs/canvas': ^0.1.89 + node-llama-cpp: 3.15.1 + opus-decoder@0.7.11: resolution: {integrity: sha512-+e+Jz3vGQLxRTBHs8YJQPRPc1Tr+/aC6coV/DlZylriA29BdHQAYXhvNRKtjftof17OFng0+P4wsFIqQu3a48A==} @@ -5071,6 +5090,9 @@ packages: tweetnacl@0.14.5: resolution: {integrity: sha512-KXXFFdAbFXY4geFIwoyNK+f5Z1b7swfXABfL7HXCmoIWMKU3dmS26672A4EeQtDzLKy7SXmfBu51JolvEKwtGA==} + tweetnacl@1.0.3: + resolution: {integrity: sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==} + type-is@1.6.18: resolution: {integrity: sha512-TkRKr9sUTxEH8MdfuCSP7VizJyzRNMjj2J2do2Jr3Kym598JVdEksuzPQCnlFPW4ky9Q+iA+ma9BGm06XQBy8g==} engines: {node: '>= 0.6'} @@ -9555,12 +9577,20 @@ snapshots: nanoid@5.1.6: {} + nats@2.29.3: + dependencies: + nkeys.js: 1.1.0 + negotiator@0.6.3: {} negotiator@1.0.0: {} netmask@2.0.2: {} + nkeys.js@1.1.0: + dependencies: + tweetnacl: 1.0.3 + node-addon-api@8.5.0: {} node-api-headers@1.8.0: {} @@ -9735,6 +9765,80 @@ snapshots: ws: 8.19.0 zod: 4.3.6 + openclaw@2026.2.1(@napi-rs/canvas@0.1.89)(@types/express@5.0.6)(audio-decode@2.2.3)(node-llama-cpp@3.15.1(typescript@5.9.3))(signal-polyfill@0.2.2): + dependencies: + '@agentclientprotocol/sdk': 0.13.1(zod@4.3.6) + '@aws-sdk/client-bedrock': 3.980.0 + '@buape/carbon': 0.14.0(hono@4.11.7) + '@clack/prompts': 1.0.0 + '@grammyjs/runner': 2.0.3(grammy@1.39.3) + '@grammyjs/transformer-throttler': 1.2.1(grammy@1.39.3) + '@homebridge/ciao': 1.3.4 + '@line/bot-sdk': 10.6.0 + '@lydell/node-pty': 1.2.0-beta.3 + '@mariozechner/pi-agent-core': 0.51.0(ws@8.19.0)(zod@4.3.6) + '@mariozechner/pi-ai': 0.51.0(ws@8.19.0)(zod@4.3.6) + '@mariozechner/pi-coding-agent': 0.51.0(ws@8.19.0)(zod@4.3.6) + '@mariozechner/pi-tui': 0.51.0 + '@mozilla/readability': 0.6.0 + '@napi-rs/canvas': 0.1.89 + '@sinclair/typebox': 0.34.47 + '@slack/bolt': 4.6.0(@types/express@5.0.6) + '@slack/web-api': 7.13.0 + '@whiskeysockets/baileys': 7.0.0-rc.9(audio-decode@2.2.3)(sharp@0.34.5) + ajv: 8.17.1 + chalk: 5.6.2 + chokidar: 5.0.0 + cli-highlight: 2.1.11 + commander: 14.0.3 + croner: 10.0.1 + discord-api-types: 0.38.38 + dotenv: 17.2.3 + express: 5.2.1 + file-type: 21.3.0 + grammy: 1.39.3 + hono: 4.11.7 + jiti: 2.6.1 + json5: 2.2.3 + jszip: 3.10.1 + linkedom: 0.18.12 + long: 5.3.2 + markdown-it: 14.1.0 + node-edge-tts: 1.2.9 + node-llama-cpp: 3.15.1(typescript@5.9.3) + osc-progress: 0.3.0 + pdfjs-dist: 5.4.624 + playwright-core: 1.58.1 + proper-lockfile: 4.1.2 + qrcode-terminal: 0.12.0 + sharp: 0.34.5 + signal-utils: 0.21.1(signal-polyfill@0.2.2) + sqlite-vec: 0.1.7-alpha.2 + tar: 7.5.7 + tslog: 4.10.2 + undici: 7.20.0 + ws: 8.19.0 + yaml: 2.8.2 + zod: 4.3.6 + transitivePeerDependencies: + - '@discordjs/opus' + - '@modelcontextprotocol/sdk' + - '@types/express' + - audio-decode + - aws-crt + - bufferutil + - canvas + - debug + - encoding + - ffmpeg-static + - jimp + - link-preview-js + - node-opus + - opusscript + - signal-polyfill + - supports-color + - utf-8-validate + opus-decoder@0.7.11: dependencies: '@wasm-audio-decoders/common': 9.0.7 @@ -10614,6 +10718,8 @@ snapshots: tweetnacl@0.14.5: {} + tweetnacl@1.0.3: {} + type-is@1.6.18: dependencies: media-typer: 0.3.0 diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index efd2427a2..06b88d7f9 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -347,14 +347,27 @@ export async function runEmbeddedAttempt( const eventStoreConfig = params.config?.gateway?.eventStore; if (eventStoreConfig?.enabled) { try { + // Extract agentId from sessionKey (format: "agent:{agentId}:...") + const sessionKeyParts = params.sessionKey?.split(":") ?? []; + const agentId = sessionKeyParts.length >= 2 ? sessionKeyParts[1] : "main"; + + // Check for agent-specific eventStore config + const agentEventConfig = eventStoreConfig.agents?.[agentId]; + const effectiveNatsUrl = + agentEventConfig?.natsUrl || eventStoreConfig.natsUrl || "nats://localhost:4222"; + const effectiveStreamName = + agentEventConfig?.streamName || eventStoreConfig.streamName || "openclaw-events"; + const effectiveSubjectPrefix = + agentEventConfig?.subjectPrefix || eventStoreConfig.subjectPrefix || "openclaw.events"; + const eventContext = await buildEventContext( { - natsUrl: eventStoreConfig.natsUrl || "nats://localhost:4222", - streamName: eventStoreConfig.streamName || "openclaw-events", - subjectPrefix: eventStoreConfig.subjectPrefix || "openclaw.events", + natsUrl: effectiveNatsUrl, + streamName: effectiveStreamName, + subjectPrefix: effectiveSubjectPrefix, }, { - agent: "agent", + agent: agentId, sessionKey: params.sessionKey, hoursBack: 2, maxEvents: 100, @@ -362,7 +375,9 @@ export async function runEmbeddedAttempt( ); if (eventContext.eventsProcessed > 0) { eventContextHint = formatContextForPrompt(eventContext); - log.info(`[event-context] Loaded ${eventContext.eventsProcessed} events for context`); + log.info( + `[event-context] Loaded ${eventContext.eventsProcessed} events for agent=${agentId}`, + ); } } catch (err) { log.warn(`[event-context] Failed to load: ${err}`); diff --git a/src/config/types.gateway.ts b/src/config/types.gateway.ts index 769d600d7..2faf62612 100644 --- a/src/config/types.gateway.ts +++ b/src/config/types.gateway.ts @@ -207,6 +207,15 @@ export type GatewayNodesConfig = { denyCommands?: string[]; }; +export type AgentEventStoreConfig = { + /** NATS server URL with agent-specific credentials. */ + natsUrl: string; + /** JetStream stream name for this agent. */ + streamName: string; + /** Subject prefix override (default: openclaw.events.{agentId}). */ + subjectPrefix?: string; +}; + export type EventStoreConfig = { /** Enable Event Store (NATS JetStream) integration. */ enabled?: boolean; @@ -216,6 +225,8 @@ export type EventStoreConfig = { streamName?: string; /** Subject prefix for events (default: openclaw.events). */ subjectPrefix?: string; + /** Per-agent NATS credentials for isolated event stores. */ + agents?: Record; }; export type GatewayConfig = { diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index 0810e85e7..3a23fa2d2 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -449,6 +449,18 @@ export const OpenClawSchema = z natsUrl: z.string().optional(), streamName: z.string().optional(), subjectPrefix: z.string().optional(), + agents: z + .record( + z.string(), + z + .object({ + natsUrl: z.string(), + streamName: z.string(), + subjectPrefix: z.string().optional(), + }) + .strict(), + ) + .optional(), }) .strict() .optional(), diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 89e6df33c..30d4762d2 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -18,9 +18,9 @@ import { } from "../config/config.js"; import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js"; import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js"; -import { initEventStore, shutdownEventStore } from "../infra/event-store.js"; import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js"; import { logAcceptedEnvOption } from "../infra/env.js"; +import { initEventStore, shutdownEventStore } from "../infra/event-store.js"; import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js"; import { onHeartbeatEvent } from "../infra/heartbeat-events.js"; import { startHeartbeatRunner } from "../infra/heartbeat-runner.js"; @@ -217,7 +217,7 @@ export async function startGatewayServer( } setGatewaySigusr1RestartPolicy({ allowExternal: cfgAtStart.commands?.restart === true }); initSubagentRegistry(); - + // Initialize Event Store if configured const eventStoreConfig = cfgAtStart.gateway?.eventStore; if (eventStoreConfig?.enabled) { @@ -226,10 +226,11 @@ export async function startGatewayServer( natsUrl: eventStoreConfig.natsUrl || "nats://localhost:4222", streamName: eventStoreConfig.streamName || "openclaw-events", subjectPrefix: eventStoreConfig.subjectPrefix || "openclaw.events", + agents: eventStoreConfig.agents, }); log.info("gateway: Event Store initialized"); } - + const defaultAgentId = resolveDefaultAgentId(cfgAtStart); const defaultWorkspaceDir = resolveAgentWorkspaceDir(cfgAtStart, defaultAgentId); const baseMethods = listGatewayMethods(); diff --git a/src/infra/event-context.ts b/src/infra/event-context.ts index 69dc81ff7..fc8e9549c 100644 --- a/src/infra/event-context.ts +++ b/src/infra/event-context.ts @@ -18,6 +18,29 @@ import { const sc = StringCodec(); +/** + * Parse NATS URL and extract credentials if present + * Supports: nats://user:pass@host:port or nats://host:port + */ +function parseNatsUrl(urlString: string): { servers: string; user?: string; pass?: string } { + try { + const httpUrl = urlString.replace(/^nats:\/\//, "http://"); + const url = new URL(httpUrl); + const servers = `${url.hostname}:${url.port || 4222}`; + + if (url.username && url.password) { + return { + servers, + user: decodeURIComponent(url.username), + pass: decodeURIComponent(url.password), + }; + } + return { servers }; + } catch { + return { servers: urlString.replace(/^nats:\/\//, "") }; + } +} + export type EventContextConfig = { natsUrl: string; streamName: string; @@ -90,7 +113,8 @@ async function queryEvents( config: EventContextConfig, options: ContextOptions, ): Promise { - const nc = await connect({ servers: config.natsUrl }); + const { servers, user, pass } = parseNatsUrl(config.natsUrl); + const nc = await connect({ servers, ...(user && pass ? { user, pass } : {}) }); try { const jsm = await nc.jetstreamManager(); diff --git a/src/infra/event-store.ts b/src/infra/event-store.ts index c358c4a3d..2645e28d0 100644 --- a/src/infra/event-store.ts +++ b/src/infra/event-store.ts @@ -5,8 +5,10 @@ * This enables: * - Full audit trail of all interactions * - Context rebuild from events (no more forgetting) - * - Multi-agent event sharing + * - Multi-agent event sharing with isolation * - Time-travel debugging + * + * Supports per-agent NATS credentials for hard isolation. */ import { @@ -22,11 +24,18 @@ import { onAgentEvent } from "./agent-events.js"; const sc = StringCodec(); +export type AgentEventStoreConfig = { + natsUrl: string; + streamName: string; + subjectPrefix?: string; +}; + export type EventStoreConfig = { enabled: boolean; natsUrl: string; streamName: string; subjectPrefix: string; + agents?: Record; }; export type ClawEvent = { @@ -55,8 +64,13 @@ export type EventType = export type Visibility = "public" | "internal" | "confidential"; -let natsConnection: NatsConnection | null = null; -let jetstream: JetStreamClient | null = null; +// Main connection (for default/main agent) +let mainConnection: NatsConnection | null = null; +let mainJetstream: JetStreamClient | null = null; + +// Per-agent connections for isolated agents +const agentConnections = new Map(); + let unsubscribe: (() => void) | null = null; let eventStoreConfig: EventStoreConfig | null = null; @@ -94,14 +108,22 @@ function mapStreamToEventType(stream: string, data: Record): Ev } /** - * Extract agent name from session key - * Format: "main" or "agent-name:session-id" + * Extract agent ID from session key + * Formats: "main" | "agent:main:sessionId" | "agentId:sessionId" */ function extractAgentFromSession(sessionKey?: string): string { - if (!sessionKey) return "unknown"; + if (!sessionKey) return "main"; if (sessionKey === "main") return "main"; + + // Handle "agent:agentId:sessionId" format + if (sessionKey.startsWith("agent:")) { + const parts = sessionKey.split(":"); + return parts[1] || "main"; + } + + // Handle "agentId:sessionId" format const parts = sessionKey.split(":"); - return parts[0] || "unknown"; + return parts[0] || "main"; } /** @@ -124,22 +146,102 @@ function toClawEvent(evt: AgentEventPayload): ClawEvent { }; } +/** + * Parse NATS URL and extract credentials if present + * Supports: nats://user:pass@host:port or nats://host:port + */ +function parseNatsUrl(urlString: string): { servers: string; user?: string; pass?: string } { + try { + const httpUrl = urlString.replace(/^nats:\/\//, "http://"); + const url = new URL(httpUrl); + const servers = `${url.hostname}:${url.port || 4222}`; + + if (url.username && url.password) { + return { + servers, + user: decodeURIComponent(url.username), + pass: decodeURIComponent(url.password), + }; + } + return { servers }; + } catch { + return { servers: urlString.replace(/^nats:\/\//, "") }; + } +} + +/** + * Get or create a connection for a specific agent + */ +async function getAgentConnection( + agentId: string, +): Promise<{ js: JetStreamClient; prefix: string; streamName: string } | null> { + if (!eventStoreConfig) return null; + + // Check for agent-specific config + const agentConfig = eventStoreConfig.agents?.[agentId]; + + if (agentConfig) { + // Check if we already have a connection for this agent + const existing = agentConnections.get(agentId); + if (existing && !existing.nc.isClosed()) { + return { + js: existing.js, + prefix: agentConfig.subjectPrefix || `openclaw.events.${agentId}`, + streamName: agentConfig.streamName, + }; + } + + // Create new connection for this agent + try { + const { servers, user, pass } = parseNatsUrl(agentConfig.natsUrl); + const nc = await connect({ servers, ...(user && pass ? { user, pass } : {}) }); + const js = nc.jetstream(); + + agentConnections.set(agentId, { nc, js }); + console.log(`[event-store] Created isolated connection for agent: ${agentId}`); + + return { + js, + prefix: agentConfig.subjectPrefix || `openclaw.events.${agentId}`, + streamName: agentConfig.streamName, + }; + } catch (err) { + console.error(`[event-store] Failed to create connection for agent ${agentId}:`, err); + // Fall through to use main connection + } + } + + // Use main connection for unconfigured agents + if (mainJetstream) { + return { + js: mainJetstream, + prefix: eventStoreConfig.subjectPrefix, + streamName: eventStoreConfig.streamName, + }; + } + + return null; +} + /** * Publish event to NATS JetStream */ async function publishEvent(evt: AgentEventPayload): Promise { - if (!jetstream || !eventStoreConfig) { - return; - } + if (!eventStoreConfig) return; try { const clawEvent = toClawEvent(evt); - const subject = `${eventStoreConfig.subjectPrefix}.${clawEvent.agent}.${clawEvent.type.replace(/\./g, "_")}`; + const agentId = clawEvent.agent; + + // Get the appropriate connection for this agent + const conn = await getAgentConnection(agentId); + if (!conn) return; + + const subject = `${conn.prefix}.${clawEvent.type.replace(/\./g, "_")}`; const payload = sc.encode(JSON.stringify(clawEvent)); - await jetstream.publish(subject, payload); + await conn.js.publish(subject, payload); } catch (err) { - // Log but don't throw — event store should never break core functionality console.error("[event-store] Failed to publish event:", err); } } @@ -147,25 +249,28 @@ async function publishEvent(evt: AgentEventPayload): Promise { /** * Ensure the JetStream stream exists */ -async function ensureStream(js: JetStreamClient, config: EventStoreConfig): Promise { - const jsm = await natsConnection!.jetstreamManager(); +async function ensureStream( + nc: NatsConnection, + streamName: string, + subjects: string[], +): Promise { + const jsm = await nc.jetstreamManager(); try { - await jsm.streams.info(config.streamName); + await jsm.streams.info(streamName); } catch { - // Stream doesn't exist, create it await jsm.streams.add({ - name: config.streamName, - subjects: [`${config.subjectPrefix}.>`], + name: streamName, + subjects, retention: RetentionPolicy.Limits, max_msgs: -1, max_bytes: -1, - max_age: 0, // Never expire + max_age: 0, storage: StorageType.File, num_replicas: 1, - duplicate_window: 120_000_000_000, // 2 minutes in nanoseconds + duplicate_window: 120_000_000_000, }); - console.log(`[event-store] Created stream: ${config.streamName}`); + console.log(`[event-store] Created stream: ${streamName}`); } } @@ -181,26 +286,31 @@ export async function initEventStore(config: EventStoreConfig): Promise { try { eventStoreConfig = config; - // Connect to NATS - natsConnection = await connect({ servers: config.natsUrl }); - console.log(`[event-store] Connected to NATS at ${config.natsUrl}`); + // Initialize main connection + const { servers, user, pass } = parseNatsUrl(config.natsUrl); + console.log(`[event-store] Connecting to NATS: ${servers} (auth: ${user ? "yes" : "no"})`); - // Get JetStream client - jetstream = natsConnection.jetstream(); + mainConnection = await connect({ servers, ...(user && pass ? { user, pass } : {}) }); + mainJetstream = mainConnection.jetstream(); + console.log(`[event-store] Connected to NATS at ${servers}`); - // Ensure stream exists - await ensureStream(jetstream, config); + // Ensure main stream exists + await ensureStream(mainConnection, config.streamName, [`${config.subjectPrefix}.>`]); + + // Log configured agent isolations + if (config.agents) { + const agentIds = Object.keys(config.agents); + console.log(`[event-store] Agent isolation configured for: ${agentIds.join(", ")}`); + } // Subscribe to all agent events unsubscribe = onAgentEvent((evt) => { - // Fire and forget — don't await to avoid blocking the event loop publishEvent(evt).catch(() => {}); }); console.log("[event-store] Event listener registered"); } catch (err) { console.error("[event-store] Failed to initialize:", err); - // Don't throw — event store failure shouldn't prevent gateway startup } } @@ -213,10 +323,22 @@ export async function shutdownEventStore(): Promise { unsubscribe = null; } - if (natsConnection) { - await natsConnection.drain(); - natsConnection = null; - jetstream = null; + // Close agent connections + for (const [agentId, { nc }] of agentConnections) { + try { + await nc.drain(); + console.log(`[event-store] Closed connection for agent: ${agentId}`); + } catch (err) { + console.error(`[event-store] Error closing connection for ${agentId}:`, err); + } + } + agentConnections.clear(); + + // Close main connection + if (mainConnection) { + await mainConnection.drain(); + mainConnection = null; + mainJetstream = null; } eventStoreConfig = null; @@ -227,15 +349,20 @@ export async function shutdownEventStore(): Promise { * Check if event store is connected */ export function isEventStoreConnected(): boolean { - return natsConnection !== null && !natsConnection.isClosed(); + return mainConnection !== null && !mainConnection.isClosed(); } /** * Get event store status */ -export function getEventStoreStatus(): { connected: boolean; config: EventStoreConfig | null } { +export function getEventStoreStatus(): { + connected: boolean; + config: EventStoreConfig | null; + agentConnections: string[]; +} { return { connected: isEventStoreConnected(), config: eventStoreConfig, + agentConnections: Array.from(agentConnections.keys()), }; }