Compare commits

...

1 commit

Author SHA1 Message Date
0d88d75169 feat(matrix): Add multi-account support
Some checks failed
CI / install-check (push) Has been cancelled
CI / checks (bunx tsc -p tsconfig.json --noEmit false, bun, build) (push) Has been cancelled
CI / checks (pnpm build && pnpm lint, node, lint) (push) Has been cancelled
CI / checks (pnpm canvas:a2ui:bundle && bunx vitest run, bun, test) (push) Has been cancelled
CI / checks (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Has been cancelled
CI / checks (pnpm format, node, format) (push) Has been cancelled
CI / checks (pnpm protocol:check, node, protocol) (push) Has been cancelled
CI / checks (pnpm tsgo, node, tsgo) (push) Has been cancelled
CI / secrets (push) Has been cancelled
CI / checks-windows (pnpm build && pnpm lint, node, build & lint) (push) Has been cancelled
CI / checks-windows (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Has been cancelled
CI / checks-windows (pnpm protocol:check, node, protocol) (push) Has been cancelled
CI / checks-macos (pnpm test, test) (push) Has been cancelled
CI / macos-app (set -euo pipefail for attempt in 1 2 3; do if swift build --package-path apps/macos --configuration release; then exit 0 fi echo "swift build failed (attempt $attempt/3). Retrying…" sleep $((attempt * 20)) done exit 1 , build) (push) Has been cancelled
CI / macos-app (set -euo pipefail for attempt in 1 2 3; do if swift test --package-path apps/macos --parallel --enable-code-coverage --show-codecov-path; then exit 0 fi echo "swift test failed (attempt $attempt/3). Retrying…" sleep $((attempt … (push) Has been cancelled
CI / macos-app (swiftlint --config .swiftlint.yml swiftformat --lint apps/macos/Sources --config .swiftformat , lint) (push) Has been cancelled
CI / ios (push) Has been cancelled
CI / android (./gradlew --no-daemon :app:assembleDebug, build) (push) Has been cancelled
CI / android (./gradlew --no-daemon :app:testDebugUnitTest, test) (push) Has been cancelled
Workflow Sanity / no-tabs (push) Has been cancelled
- Replace single sharedClientState with Map<accountKey, ClientState>
- Add accountId parameter throughout client lifecycle
- Support multiple Matrix accounts in parallel
- Fix E2EE event deduplication
- Add debug logging for troubleshooting

Co-authored-by: Albert Hild <albert@vainplex.de>
2026-02-02 09:26:13 +01:00
20 changed files with 782 additions and 355 deletions

View file

@ -7,16 +7,14 @@ import {
type ChannelMessageActionName, type ChannelMessageActionName,
type ChannelToolSend, type ChannelToolSend,
} from "openclaw/plugin-sdk"; } from "openclaw/plugin-sdk";
import type { CoreConfig } from "./types.js";
import { resolveMatrixAccount } from "./matrix/accounts.js"; import { resolveMatrixAccount } from "./matrix/accounts.js";
import { handleMatrixAction } from "./tool-actions.js"; import { handleMatrixAction } from "./tool-actions.js";
import type { CoreConfig } from "./types.js";
export const matrixMessageActions: ChannelMessageActionAdapter = { export const matrixMessageActions: ChannelMessageActionAdapter = {
listActions: ({ cfg }) => { listActions: ({ cfg }) => {
const account = resolveMatrixAccount({ cfg: cfg as CoreConfig }); const account = resolveMatrixAccount({ cfg: cfg as CoreConfig });
if (!account.enabled || !account.configured) { if (!account.enabled || !account.configured) return [];
return [];
}
const gate = createActionGate((cfg as CoreConfig).channels?.matrix?.actions); const gate = createActionGate((cfg as CoreConfig).channels?.matrix?.actions);
const actions = new Set<ChannelMessageActionName>(["send", "poll"]); const actions = new Set<ChannelMessageActionName>(["send", "poll"]);
if (gate("reactions")) { if (gate("reactions")) {
@ -33,28 +31,23 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
actions.add("unpin"); actions.add("unpin");
actions.add("list-pins"); actions.add("list-pins");
} }
if (gate("memberInfo")) { if (gate("memberInfo")) actions.add("member-info");
actions.add("member-info"); if (gate("channelInfo")) actions.add("channel-info");
}
if (gate("channelInfo")) {
actions.add("channel-info");
}
return Array.from(actions); return Array.from(actions);
}, },
supportsAction: ({ action }) => action !== "poll", supportsAction: ({ action }) => action !== "poll",
extractToolSend: ({ args }): ChannelToolSend | null => { extractToolSend: ({ args }): ChannelToolSend | null => {
const action = typeof args.action === "string" ? args.action.trim() : ""; const action = typeof args.action === "string" ? args.action.trim() : "";
if (action !== "sendMessage") { if (action !== "sendMessage") return null;
return null;
}
const to = typeof args.to === "string" ? args.to : undefined; const to = typeof args.to === "string" ? args.to : undefined;
if (!to) { if (!to) return null;
return null;
}
return { to }; return { to };
}, },
handleAction: async (ctx: ChannelMessageActionContext) => { handleAction: async (ctx: ChannelMessageActionContext) => {
const { action, params, cfg } = ctx; const { action, params, cfg } = ctx;
// Get accountId from context for multi-account support
const accountId = (ctx as { accountId?: string }).accountId ?? undefined;
const resolveRoomId = () => const resolveRoomId = () =>
readStringParam(params, "roomId") ?? readStringParam(params, "roomId") ??
readStringParam(params, "channelId") ?? readStringParam(params, "channelId") ??
@ -77,6 +70,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
mediaUrl: mediaUrl ?? undefined, mediaUrl: mediaUrl ?? undefined,
replyToId: replyTo ?? undefined, replyToId: replyTo ?? undefined,
threadId: threadId ?? undefined, threadId: threadId ?? undefined,
accountId,
}, },
cfg, cfg,
); );
@ -93,6 +87,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
messageId, messageId,
emoji, emoji,
remove, remove,
accountId,
}, },
cfg, cfg,
); );
@ -107,6 +102,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
roomId: resolveRoomId(), roomId: resolveRoomId(),
messageId, messageId,
limit, limit,
accountId,
}, },
cfg, cfg,
); );
@ -121,6 +117,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
limit, limit,
before: readStringParam(params, "before"), before: readStringParam(params, "before"),
after: readStringParam(params, "after"), after: readStringParam(params, "after"),
accountId,
}, },
cfg, cfg,
); );
@ -135,6 +132,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
roomId: resolveRoomId(), roomId: resolveRoomId(),
messageId, messageId,
content, content,
accountId,
}, },
cfg, cfg,
); );
@ -147,6 +145,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
action: "deleteMessage", action: "deleteMessage",
roomId: resolveRoomId(), roomId: resolveRoomId(),
messageId, messageId,
accountId,
}, },
cfg, cfg,
); );
@ -163,6 +162,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins", action === "pin" ? "pinMessage" : action === "unpin" ? "unpinMessage" : "listPins",
roomId: resolveRoomId(), roomId: resolveRoomId(),
messageId, messageId,
accountId,
}, },
cfg, cfg,
); );
@ -175,6 +175,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
action: "memberInfo", action: "memberInfo",
userId, userId,
roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"), roomId: readStringParam(params, "roomId") ?? readStringParam(params, "channelId"),
accountId,
}, },
cfg, cfg,
); );
@ -185,6 +186,7 @@ export const matrixMessageActions: ChannelMessageActionAdapter = {
{ {
action: "channelInfo", action: "channelInfo",
roomId: resolveRoomId(), roomId: resolveRoomId(),
accountId,
}, },
cfg, cfg,
); );

View file

@ -9,14 +9,11 @@ import {
setAccountEnabledInConfigSection, setAccountEnabledInConfigSection,
type ChannelPlugin, type ChannelPlugin,
} from "openclaw/plugin-sdk"; } from "openclaw/plugin-sdk";
import type { CoreConfig } from "./types.js";
import { matrixMessageActions } from "./actions.js"; import { matrixMessageActions } from "./actions.js";
import { MatrixConfigSchema } from "./config-schema.js"; import { MatrixConfigSchema } from "./config-schema.js";
import { listMatrixDirectoryGroupsLive, listMatrixDirectoryPeersLive } from "./directory-live.js"; import { resolveMatrixGroupRequireMention, resolveMatrixGroupToolPolicy } from "./group-mentions.js";
import { import type { CoreConfig } from "./types.js";
resolveMatrixGroupRequireMention,
resolveMatrixGroupToolPolicy,
} from "./group-mentions.js";
import { import {
listMatrixAccountIds, listMatrixAccountIds,
resolveDefaultMatrixAccountId, resolveDefaultMatrixAccountId,
@ -24,12 +21,17 @@ import {
type ResolvedMatrixAccount, type ResolvedMatrixAccount,
} from "./matrix/accounts.js"; } from "./matrix/accounts.js";
import { resolveMatrixAuth } from "./matrix/client.js"; import { resolveMatrixAuth } from "./matrix/client.js";
import { importMatrixIndex } from "./matrix/import-mutex.js";
import { normalizeAllowListLower } from "./matrix/monitor/allowlist.js"; import { normalizeAllowListLower } from "./matrix/monitor/allowlist.js";
import { probeMatrix } from "./matrix/probe.js"; import { probeMatrix } from "./matrix/probe.js";
import { sendMessageMatrix } from "./matrix/send.js"; import { sendMessageMatrix } from "./matrix/send.js";
import { matrixOnboardingAdapter } from "./onboarding.js"; import { matrixOnboardingAdapter } from "./onboarding.js";
import { matrixOutbound } from "./outbound.js"; import { matrixOutbound } from "./outbound.js";
import { resolveMatrixTargets } from "./resolve-targets.js"; import { resolveMatrixTargets } from "./resolve-targets.js";
import {
listMatrixDirectoryGroupsLive,
listMatrixDirectoryPeersLive,
} from "./directory-live.js";
const meta = { const meta = {
id: "matrix", id: "matrix",
@ -44,9 +46,7 @@ const meta = {
function normalizeMatrixMessagingTarget(raw: string): string | undefined { function normalizeMatrixMessagingTarget(raw: string): string | undefined {
let normalized = raw.trim(); let normalized = raw.trim();
if (!normalized) { if (!normalized) return undefined;
return undefined;
}
const lowered = normalized.toLowerCase(); const lowered = normalized.toLowerCase();
if (lowered.startsWith("matrix:")) { if (lowered.startsWith("matrix:")) {
normalized = normalized.slice("matrix:".length).trim(); normalized = normalized.slice("matrix:".length).trim();
@ -109,7 +109,8 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
configSchema: buildChannelConfigSchema(MatrixConfigSchema), configSchema: buildChannelConfigSchema(MatrixConfigSchema),
config: { config: {
listAccountIds: (cfg) => listMatrixAccountIds(cfg as CoreConfig), listAccountIds: (cfg) => listMatrixAccountIds(cfg as CoreConfig),
resolveAccount: (cfg, accountId) => resolveMatrixAccount({ cfg: cfg as CoreConfig, accountId }), resolveAccount: (cfg, accountId) =>
resolveMatrixAccount({ cfg: cfg as CoreConfig, accountId }),
defaultAccountId: (cfg) => resolveDefaultMatrixAccountId(cfg as CoreConfig), defaultAccountId: (cfg) => resolveDefaultMatrixAccountId(cfg as CoreConfig),
setAccountEnabled: ({ cfg, accountId, enabled }) => setAccountEnabled: ({ cfg, accountId, enabled }) =>
setAccountEnabledInConfigSection({ setAccountEnabledInConfigSection({
@ -153,20 +154,15 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
policyPath: "channels.matrix.dm.policy", policyPath: "channels.matrix.dm.policy",
allowFromPath: "channels.matrix.dm.allowFrom", allowFromPath: "channels.matrix.dm.allowFrom",
approveHint: formatPairingApproveHint("matrix"), approveHint: formatPairingApproveHint("matrix"),
normalizeEntry: (raw) => normalizeEntry: (raw) => raw.replace(/^matrix:/i, "").trim().toLowerCase(),
raw
.replace(/^matrix:/i, "")
.trim()
.toLowerCase(),
}), }),
collectWarnings: ({ account, cfg }) => { collectWarnings: ({ account, cfg }) => {
const defaultGroupPolicy = (cfg as CoreConfig).channels?.defaults?.groupPolicy; const defaultGroupPolicy = (cfg as CoreConfig).channels?.defaults?.groupPolicy;
const groupPolicy = account.config.groupPolicy ?? defaultGroupPolicy ?? "allowlist"; const groupPolicy =
if (groupPolicy !== "open") { account.config.groupPolicy ?? defaultGroupPolicy ?? "allowlist";
return []; if (groupPolicy !== "open") return [];
}
return [ return [
'- Matrix rooms: groupPolicy="open" allows any room to trigger (mention-gated). Set channels.matrix.groupPolicy="allowlist" + channels.matrix.groups (and optionally channels.matrix.groupAllowFrom) to restrict rooms.', "- Matrix rooms: groupPolicy=\"open\" allows any room to trigger (mention-gated). Set channels.matrix.groupPolicy=\"allowlist\" + channels.matrix.groups (and optionally channels.matrix.groupAllowFrom) to restrict rooms.",
]; ];
}, },
}, },
@ -175,13 +171,16 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
resolveToolPolicy: resolveMatrixGroupToolPolicy, resolveToolPolicy: resolveMatrixGroupToolPolicy,
}, },
threading: { threading: {
resolveReplyToMode: ({ cfg }) => (cfg as CoreConfig).channels?.matrix?.replyToMode ?? "off", resolveReplyToMode: ({ cfg }) =>
(cfg as CoreConfig).channels?.matrix?.replyToMode ?? "off",
buildToolContext: ({ context, hasRepliedRef }) => { buildToolContext: ({ context, hasRepliedRef }) => {
const currentTarget = context.To; const currentTarget = context.To;
return { return {
currentChannelId: currentTarget?.trim() || undefined, currentChannelId: currentTarget?.trim() || undefined,
currentThreadTs: currentThreadTs:
context.MessageThreadId != null ? String(context.MessageThreadId) : context.ReplyToId, context.MessageThreadId != null
? String(context.MessageThreadId)
: context.ReplyToId,
hasRepliedRef, hasRepliedRef,
}; };
}, },
@ -191,12 +190,8 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
targetResolver: { targetResolver: {
looksLikeId: (raw) => { looksLikeId: (raw) => {
const trimmed = raw.trim(); const trimmed = raw.trim();
if (!trimmed) { if (!trimmed) return false;
return false; if (/^(matrix:)?[!#@]/i.test(trimmed)) return true;
}
if (/^(matrix:)?[!#@]/i.test(trimmed)) {
return true;
}
return trimmed.includes(":"); return trimmed.includes(":");
}, },
hint: "<room|alias|user>", hint: "<room|alias|user>",
@ -211,17 +206,13 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
for (const entry of account.config.dm?.allowFrom ?? []) { for (const entry of account.config.dm?.allowFrom ?? []) {
const raw = String(entry).trim(); const raw = String(entry).trim();
if (!raw || raw === "*") { if (!raw || raw === "*") continue;
continue;
}
ids.add(raw.replace(/^matrix:/i, "")); ids.add(raw.replace(/^matrix:/i, ""));
} }
for (const entry of account.config.groupAllowFrom ?? []) { for (const entry of account.config.groupAllowFrom ?? []) {
const raw = String(entry).trim(); const raw = String(entry).trim();
if (!raw || raw === "*") { if (!raw || raw === "*") continue;
continue;
}
ids.add(raw.replace(/^matrix:/i, "")); ids.add(raw.replace(/^matrix:/i, ""));
} }
@ -229,9 +220,7 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
for (const room of Object.values(groups)) { for (const room of Object.values(groups)) {
for (const entry of room.users ?? []) { for (const entry of room.users ?? []) {
const raw = String(entry).trim(); const raw = String(entry).trim();
if (!raw || raw === "*") { if (!raw || raw === "*") continue;
continue;
}
ids.add(raw.replace(/^matrix:/i, "")); ids.add(raw.replace(/^matrix:/i, ""));
} }
} }
@ -242,9 +231,7 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
.map((raw) => { .map((raw) => {
const lowered = raw.toLowerCase(); const lowered = raw.toLowerCase();
const cleaned = lowered.startsWith("user:") ? raw.slice("user:".length).trim() : raw; const cleaned = lowered.startsWith("user:") ? raw.slice("user:".length).trim() : raw;
if (cleaned.startsWith("@")) { if (cleaned.startsWith("@")) return `user:${cleaned}`;
return `user:${cleaned}`;
}
return cleaned; return cleaned;
}) })
.filter((id) => (q ? id.toLowerCase().includes(q) : true)) .filter((id) => (q ? id.toLowerCase().includes(q) : true))
@ -269,12 +256,8 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
.map((raw) => raw.replace(/^matrix:/i, "")) .map((raw) => raw.replace(/^matrix:/i, ""))
.map((raw) => { .map((raw) => {
const lowered = raw.toLowerCase(); const lowered = raw.toLowerCase();
if (lowered.startsWith("room:") || lowered.startsWith("channel:")) { if (lowered.startsWith("room:") || lowered.startsWith("channel:")) return raw;
return raw; if (raw.startsWith("!")) return `room:${raw}`;
}
if (raw.startsWith("!")) {
return `room:${raw}`;
}
return raw; return raw;
}) })
.filter((id) => (q ? id.toLowerCase().includes(q) : true)) .filter((id) => (q ? id.toLowerCase().includes(q) : true))
@ -302,12 +285,8 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
name, name,
}), }),
validateInput: ({ input }) => { validateInput: ({ input }) => {
if (input.useEnv) { if (input.useEnv) return null;
return null; if (!input.homeserver?.trim()) return "Matrix requires --homeserver";
}
if (!input.homeserver?.trim()) {
return "Matrix requires --homeserver";
}
const accessToken = input.accessToken?.trim(); const accessToken = input.accessToken?.trim();
const password = input.password?.trim(); const password = input.password?.trim();
const userId = input.userId?.trim(); const userId = input.userId?.trim();
@ -315,12 +294,8 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
return "Matrix requires --access-token or --password"; return "Matrix requires --access-token or --password";
} }
if (!accessToken) { if (!accessToken) {
if (!userId) { if (!userId) return "Matrix requires --user-id when using --password";
return "Matrix requires --user-id when using --password"; if (!password) return "Matrix requires --password when using --user-id";
}
if (!password) {
return "Matrix requires --password when using --user-id";
}
} }
return null; return null;
}, },
@ -365,9 +340,7 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
collectStatusIssues: (accounts) => collectStatusIssues: (accounts) =>
accounts.flatMap((account) => { accounts.flatMap((account) => {
const lastError = typeof account.lastError === "string" ? account.lastError.trim() : ""; const lastError = typeof account.lastError === "string" ? account.lastError.trim() : "";
if (!lastError) { if (!lastError) return [];
return [];
}
return [ return [
{ {
channel: "matrix", channel: "matrix",
@ -387,7 +360,7 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
probe: snapshot.probe, probe: snapshot.probe,
lastProbeAt: snapshot.lastProbeAt ?? null, lastProbeAt: snapshot.lastProbeAt ?? null,
}), }),
probeAccount: async ({ timeoutMs, cfg }) => { probeAccount: async ({ account, timeoutMs, cfg }) => {
try { try {
const auth = await resolveMatrixAuth({ cfg: cfg as CoreConfig }); const auth = await resolveMatrixAuth({ cfg: cfg as CoreConfig });
return await probeMatrix({ return await probeMatrix({
@ -427,9 +400,12 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
accountId: account.accountId, accountId: account.accountId,
baseUrl: account.homeserver, baseUrl: account.homeserver,
}); });
ctx.log?.info(`[${account.accountId}] starting provider (${account.homeserver ?? "matrix"})`); ctx.log?.info(
`[${account.accountId}] starting provider (${account.homeserver ?? "matrix"})`,
);
// Lazy import: the monitor pulls the reply pipeline; avoid ESM init cycles. // Lazy import: the monitor pulls the reply pipeline; avoid ESM init cycles.
const { monitorMatrixProvider } = await import("./matrix/index.js"); // Use serialized import to prevent race conditions during parallel account startup.
const { monitorMatrixProvider } = await importMatrixIndex();
return monitorMatrixProvider({ return monitorMatrixProvider({
runtime: ctx.runtime, runtime: ctx.runtime,
abortSignal: ctx.abortSignal, abortSignal: ctx.abortSignal,

View file

@ -1,5 +1,5 @@
import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "openclaw/plugin-sdk"; import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "openclaw/plugin-sdk";
import type { CoreConfig, MatrixConfig } from "../types.js"; import type { CoreConfig, MatrixAccountConfig, MatrixConfig } from "../types.js";
import { resolveMatrixConfig } from "./client.js"; import { resolveMatrixConfig } from "./client.js";
import { credentialsMatchConfig, loadMatrixCredentials } from "./credentials.js"; import { credentialsMatchConfig, loadMatrixCredentials } from "./credentials.js";
@ -10,56 +10,155 @@ export type ResolvedMatrixAccount = {
configured: boolean; configured: boolean;
homeserver?: string; homeserver?: string;
userId?: string; userId?: string;
config: MatrixConfig; accessToken?: string;
config: MatrixAccountConfig;
}; };
export function listMatrixAccountIds(_cfg: CoreConfig): string[] { /**
return [DEFAULT_ACCOUNT_ID]; * List account IDs explicitly configured in channels.matrix.accounts
*/
function listConfiguredAccountIds(cfg: CoreConfig): string[] {
const accounts = cfg.channels?.matrix?.accounts;
if (!accounts || typeof accounts !== "object") {
return [];
}
const ids = new Set<string>();
for (const key of Object.keys(accounts)) {
if (!key) continue;
ids.add(normalizeAccountId(key));
}
return [...ids];
}
/**
* List account IDs referenced in bindings for matrix channel
*/
function listBoundAccountIds(cfg: CoreConfig): string[] {
const bindings = cfg.bindings;
if (!Array.isArray(bindings)) return [];
const ids = new Set<string>();
for (const binding of bindings) {
if (binding.match?.channel === "matrix" && binding.match?.accountId) {
ids.add(normalizeAccountId(binding.match.accountId));
}
}
return [...ids];
}
/**
* List all Matrix account IDs (configured + bound)
*/
export function listMatrixAccountIds(cfg: CoreConfig): string[] {
const ids = Array.from(
new Set([
DEFAULT_ACCOUNT_ID,
...listConfiguredAccountIds(cfg),
...listBoundAccountIds(cfg),
]),
);
return ids.toSorted((a, b) => a.localeCompare(b));
} }
export function resolveDefaultMatrixAccountId(cfg: CoreConfig): string { export function resolveDefaultMatrixAccountId(cfg: CoreConfig): string {
const ids = listMatrixAccountIds(cfg); const ids = listMatrixAccountIds(cfg);
if (ids.includes(DEFAULT_ACCOUNT_ID)) { if (ids.includes(DEFAULT_ACCOUNT_ID)) return DEFAULT_ACCOUNT_ID;
return DEFAULT_ACCOUNT_ID;
}
return ids[0] ?? DEFAULT_ACCOUNT_ID; return ids[0] ?? DEFAULT_ACCOUNT_ID;
} }
/**
* Get account-specific config from channels.matrix.accounts[accountId]
*/
function resolveAccountConfig(
cfg: CoreConfig,
accountId: string,
): MatrixAccountConfig | undefined {
const accounts = cfg.channels?.matrix?.accounts;
if (!accounts || typeof accounts !== "object") {
return undefined;
}
const direct = accounts[accountId] as MatrixAccountConfig | undefined;
if (direct) return direct;
const normalized = normalizeAccountId(accountId);
const matchKey = Object.keys(accounts).find(
(key) => normalizeAccountId(key) === normalized
);
return matchKey ? (accounts[matchKey] as MatrixAccountConfig | undefined) : undefined;
}
/**
* Merge base matrix config with account-specific overrides
*/
function mergeMatrixAccountConfig(cfg: CoreConfig, accountId: string): MatrixAccountConfig {
const base = cfg.channels?.matrix ?? {};
// Extract base config without 'accounts' key
const { accounts: _ignored, ...baseConfig } = base as MatrixConfig;
const accountConfig = resolveAccountConfig(cfg, accountId) ?? {};
// Account config overrides base config
return { ...baseConfig, ...accountConfig };
}
export function resolveMatrixAccount(params: { export function resolveMatrixAccount(params: {
cfg: CoreConfig; cfg: CoreConfig;
accountId?: string | null; accountId?: string | null;
}): ResolvedMatrixAccount { }): ResolvedMatrixAccount {
const accountId = normalizeAccountId(params.accountId); const accountId = normalizeAccountId(params.accountId);
const base = params.cfg.channels?.matrix ?? {}; const merged = mergeMatrixAccountConfig(params.cfg, accountId);
const enabled = base.enabled !== false;
const resolved = resolveMatrixConfig(params.cfg, process.env); // Check if this is a non-default account - use account-specific auth
const hasHomeserver = Boolean(resolved.homeserver); const isDefaultAccount = accountId === DEFAULT_ACCOUNT_ID || accountId === "default";
const hasUserId = Boolean(resolved.userId);
const hasAccessToken = Boolean(resolved.accessToken); // For non-default accounts, use account-specific credentials
const hasPassword = Boolean(resolved.password); // For default account, use base config or env
let homeserver = merged.homeserver;
let userId = merged.userId;
let accessToken = merged.accessToken;
if (isDefaultAccount) {
// Default account can fall back to env vars
const resolved = resolveMatrixConfig(params.cfg, process.env);
homeserver = homeserver || resolved.homeserver;
userId = userId || resolved.userId;
accessToken = accessToken || resolved.accessToken;
}
const baseEnabled = params.cfg.channels?.matrix?.enabled !== false;
const accountEnabled = merged.enabled !== false;
const enabled = baseEnabled && accountEnabled;
const hasHomeserver = Boolean(homeserver);
const hasAccessToken = Boolean(accessToken);
const hasPassword = Boolean(merged.password);
const hasUserId = Boolean(userId);
const hasPasswordAuth = hasUserId && hasPassword; const hasPasswordAuth = hasUserId && hasPassword;
const stored = loadMatrixCredentials(process.env);
// Check for stored credentials (only for default account)
const stored = isDefaultAccount ? loadMatrixCredentials(process.env) : null;
const hasStored = const hasStored =
stored && resolved.homeserver stored && homeserver
? credentialsMatchConfig(stored, { ? credentialsMatchConfig(stored, {
homeserver: resolved.homeserver, homeserver: homeserver,
userId: resolved.userId || "", userId: userId || "",
}) })
: false; : false;
const configured = hasHomeserver && (hasAccessToken || hasPasswordAuth || Boolean(hasStored)); const configured = hasHomeserver && (hasAccessToken || hasPasswordAuth || Boolean(hasStored));
return { return {
accountId, accountId,
enabled, enabled,
name: base.name?.trim() || undefined, name: merged.name?.trim() || undefined,
configured, configured,
homeserver: resolved.homeserver || undefined, homeserver: homeserver || undefined,
userId: resolved.userId || undefined, userId: userId || undefined,
config: base, accessToken: accessToken || undefined,
config: merged,
}; };
} }
export function listEnabledMatrixAccounts(cfg: CoreConfig): ResolvedMatrixAccount[] { export function listEnabledMatrixAccounts(cfg: CoreConfig): ResolvedMatrixAccount[] {
return listMatrixAccountIds(cfg) return listMatrixAccountIds(cfg)
.map((accountId) => resolveMatrixAccount({ cfg, accountId })) .map((accountId) => resolveMatrixAccount({ cfg, accountId }))
.filter((account) => account.enabled); .filter((account) => account.enabled && account.configured);
} }

View file

@ -1,6 +1,5 @@
import type { CoreConfig } from "../types.js";
import type { MatrixActionClient, MatrixActionClientOpts } from "./types.js";
import { getMatrixRuntime } from "../../runtime.js"; import { getMatrixRuntime } from "../../runtime.js";
import type { CoreConfig } from "../types.js";
import { getActiveMatrixClient } from "../active-client.js"; import { getActiveMatrixClient } from "../active-client.js";
import { import {
createMatrixClient, createMatrixClient,
@ -8,6 +7,7 @@ import {
resolveMatrixAuth, resolveMatrixAuth,
resolveSharedMatrixClient, resolveSharedMatrixClient,
} from "../client.js"; } from "../client.js";
import type { MatrixActionClient, MatrixActionClientOpts } from "./types.js";
export function ensureNodeRuntime() { export function ensureNodeRuntime() {
if (isBunRuntime()) { if (isBunRuntime()) {
@ -19,23 +19,24 @@ export async function resolveActionClient(
opts: MatrixActionClientOpts = {}, opts: MatrixActionClientOpts = {},
): Promise<MatrixActionClient> { ): Promise<MatrixActionClient> {
ensureNodeRuntime(); ensureNodeRuntime();
if (opts.client) { if (opts.client) return { client: opts.client, stopOnDone: false };
return { client: opts.client, stopOnDone: false };
} // Try to get the active client for the specified account
const active = getActiveMatrixClient(); const active = getActiveMatrixClient(opts.accountId);
if (active) { if (active) return { client: active, stopOnDone: false };
return { client: active, stopOnDone: false };
}
const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT); const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT);
if (shouldShareClient) { if (shouldShareClient) {
const client = await resolveSharedMatrixClient({ const client = await resolveSharedMatrixClient({
cfg: getMatrixRuntime().config.loadConfig() as CoreConfig, cfg: getMatrixRuntime().config.loadConfig() as CoreConfig,
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
}); });
return { client, stopOnDone: false }; return { client, stopOnDone: false };
} }
const auth = await resolveMatrixAuth({ const auth = await resolveMatrixAuth({
cfg: getMatrixRuntime().config.loadConfig() as CoreConfig, cfg: getMatrixRuntime().config.loadConfig() as CoreConfig,
accountId: opts.accountId ?? undefined,
}); });
const client = await createMatrixClient({ const client = await createMatrixClient({
homeserver: auth.homeserver, homeserver: auth.homeserver,
@ -43,6 +44,7 @@ export async function resolveActionClient(
accessToken: auth.accessToken, accessToken: auth.accessToken,
encryption: auth.encryption, encryption: auth.encryption,
localTimeoutMs: opts.timeoutMs, localTimeoutMs: opts.timeoutMs,
accountId: opts.accountId ?? undefined,
}); });
if (auth.encryption && client.crypto) { if (auth.encryption && client.crypto) {
try { try {

View file

@ -1,6 +1,3 @@
import { resolveMatrixRoomId, sendMessageMatrix } from "../send.js";
import { resolveActionClient } from "./client.js";
import { summarizeMatrixRawEvent } from "./summary.js";
import { import {
EventType, EventType,
MsgType, MsgType,
@ -10,6 +7,9 @@ import {
type MatrixRawEvent, type MatrixRawEvent,
type RoomMessageEventContent, type RoomMessageEventContent,
} from "./types.js"; } from "./types.js";
import { resolveActionClient } from "./client.js";
import { summarizeMatrixRawEvent } from "./summary.js";
import { resolveMatrixRoomId, sendMessageMatrix } from "../send.js";
export async function sendMatrixMessage( export async function sendMatrixMessage(
to: string, to: string,
@ -26,6 +26,7 @@ export async function sendMatrixMessage(
threadId: opts.threadId, threadId: opts.threadId,
client: opts.client, client: opts.client,
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
}); });
} }
@ -36,9 +37,7 @@ export async function editMatrixMessage(
opts: MatrixActionClientOpts = {}, opts: MatrixActionClientOpts = {},
) { ) {
const trimmed = content.trim(); const trimmed = content.trim();
if (!trimmed) { if (!trimmed) throw new Error("Matrix edit requires content");
throw new Error("Matrix edit requires content");
}
const { client, stopOnDone } = await resolveActionClient(opts); const { client, stopOnDone } = await resolveActionClient(opts);
try { try {
const resolvedRoom = await resolveMatrixRoomId(client, roomId); const resolvedRoom = await resolveMatrixRoomId(client, roomId);
@ -58,9 +57,7 @@ export async function editMatrixMessage(
const eventId = await client.sendMessage(resolvedRoom, payload); const eventId = await client.sendMessage(resolvedRoom, payload);
return { eventId: eventId ?? null }; return { eventId: eventId ?? null };
} finally { } finally {
if (stopOnDone) { if (stopOnDone) client.stop();
client.stop();
}
} }
} }
@ -74,9 +71,7 @@ export async function deleteMatrixMessage(
const resolvedRoom = await resolveMatrixRoomId(client, roomId); const resolvedRoom = await resolveMatrixRoomId(client, roomId);
await client.redactEvent(resolvedRoom, messageId, opts.reason); await client.redactEvent(resolvedRoom, messageId, opts.reason);
} finally { } finally {
if (stopOnDone) { if (stopOnDone) client.stop();
client.stop();
}
} }
} }
@ -102,7 +97,7 @@ export async function readMatrixMessages(
const token = opts.before?.trim() || opts.after?.trim() || undefined; const token = opts.before?.trim() || opts.after?.trim() || undefined;
const dir = opts.after ? "f" : "b"; const dir = opts.after ? "f" : "b";
// @vector-im/matrix-bot-sdk uses doRequest for room messages // @vector-im/matrix-bot-sdk uses doRequest for room messages
const res = (await client.doRequest( const res = await client.doRequest(
"GET", "GET",
`/_matrix/client/v3/rooms/${encodeURIComponent(resolvedRoom)}/messages`, `/_matrix/client/v3/rooms/${encodeURIComponent(resolvedRoom)}/messages`,
{ {
@ -110,7 +105,7 @@ export async function readMatrixMessages(
limit, limit,
from: token, from: token,
}, },
)) as { chunk: MatrixRawEvent[]; start?: string; end?: string }; ) as { chunk: MatrixRawEvent[]; start?: string; end?: string };
const messages = res.chunk const messages = res.chunk
.filter((event) => event.type === EventType.RoomMessage) .filter((event) => event.type === EventType.RoomMessage)
.filter((event) => !event.unsigned?.redacted_because) .filter((event) => !event.unsigned?.redacted_because)
@ -121,8 +116,6 @@ export async function readMatrixMessages(
prevBatch: res.start ?? null, prevBatch: res.start ?? null,
}; };
} finally { } finally {
if (stopOnDone) { if (stopOnDone) client.stop();
client.stop();
}
} }
} }

View file

@ -57,6 +57,7 @@ export type MatrixRawEvent = {
export type MatrixActionClientOpts = { export type MatrixActionClientOpts = {
client?: MatrixClient; client?: MatrixClient;
timeoutMs?: number; timeoutMs?: number;
accountId?: string | null;
}; };
export type MatrixMessageSummary = { export type MatrixMessageSummary = {

View file

@ -1,11 +1,34 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
let activeClient: MatrixClient | null = null; const DEFAULT_ACCOUNT_KEY = "default";
export function setActiveMatrixClient(client: MatrixClient | null): void { // Multi-account: Map of accountId -> client
activeClient = client; const activeClients = new Map<string, MatrixClient>();
function normalizeAccountKey(accountId?: string | null): string {
return accountId?.trim().toLowerCase() || DEFAULT_ACCOUNT_KEY;
} }
export function getActiveMatrixClient(): MatrixClient | null { export function setActiveMatrixClient(client: MatrixClient | null, accountId?: string | null): void {
return activeClient; const key = normalizeAccountKey(accountId);
if (client) {
activeClients.set(key, client);
} else {
activeClients.delete(key);
}
}
export function getActiveMatrixClient(accountId?: string | null): MatrixClient | null {
const key = normalizeAccountKey(accountId);
const client = activeClients.get(key);
if (client) return client;
// Fallback: if specific account not found, try default
if (key !== DEFAULT_ACCOUNT_KEY) {
return activeClients.get(DEFAULT_ACCOUNT_KEY) ?? null;
}
return null;
}
export function listActiveMatrixClients(): Array<{ accountId: string; client: MatrixClient }> {
return Array.from(activeClients.entries()).map(([accountId, client]) => ({ accountId, client }));
} }

View file

@ -1,28 +1,70 @@
import { MatrixClient } from "@vector-im/matrix-bot-sdk"; import { MatrixClient } from "@vector-im/matrix-bot-sdk";
import type { CoreConfig } from "../types.js"; import { DEFAULT_ACCOUNT_ID, normalizeAccountId } from "openclaw/plugin-sdk";
import type { MatrixAuth, MatrixResolvedConfig } from "./types.js";
import type { CoreConfig, MatrixAccountConfig, MatrixConfig } from "../types.js";
import { getMatrixRuntime } from "../../runtime.js"; import { getMatrixRuntime } from "../../runtime.js";
import { ensureMatrixSdkLoggingConfigured } from "./logging.js"; import { ensureMatrixSdkLoggingConfigured } from "./logging.js";
import type { MatrixAuth, MatrixResolvedConfig } from "./types.js";
import { importCredentials } from "../import-mutex.js";
function clean(value?: string): string { function clean(value?: string): string {
return value?.trim() ?? ""; return value?.trim() ?? "";
} }
/**
* Get account-specific config from channels.matrix.accounts[accountId]
*/
function resolveAccountConfig(
cfg: CoreConfig,
accountId: string,
): MatrixAccountConfig | undefined {
const accounts = cfg.channels?.matrix?.accounts;
if (!accounts || typeof accounts !== "object") {
return undefined;
}
const direct = accounts[accountId] as MatrixAccountConfig | undefined;
if (direct) return direct;
const normalized = normalizeAccountId(accountId);
const matchKey = Object.keys(accounts).find(
(key) => normalizeAccountId(key) === normalized
);
return matchKey ? (accounts[matchKey] as MatrixAccountConfig | undefined) : undefined;
}
/**
* Merge base matrix config with account-specific overrides
*/
function mergeMatrixAccountConfig(cfg: CoreConfig, accountId: string): MatrixAccountConfig {
const base = cfg.channels?.matrix ?? {};
const { accounts: _ignored, ...baseConfig } = base as MatrixConfig;
const accountConfig = resolveAccountConfig(cfg, accountId) ?? {};
return { ...baseConfig, ...accountConfig };
}
export function resolveMatrixConfig( export function resolveMatrixConfig(
cfg: CoreConfig = getMatrixRuntime().config.loadConfig() as CoreConfig, cfg: CoreConfig = getMatrixRuntime().config.loadConfig() as CoreConfig,
env: NodeJS.ProcessEnv = process.env, env: NodeJS.ProcessEnv = process.env,
accountId?: string,
): MatrixResolvedConfig { ): MatrixResolvedConfig {
const matrix = cfg.channels?.matrix ?? {}; const normalizedAccountId = normalizeAccountId(accountId);
const homeserver = clean(matrix.homeserver) || clean(env.MATRIX_HOMESERVER); const isDefaultAccount = normalizedAccountId === DEFAULT_ACCOUNT_ID || normalizedAccountId === "default";
const userId = clean(matrix.userId) || clean(env.MATRIX_USER_ID);
const accessToken = clean(matrix.accessToken) || clean(env.MATRIX_ACCESS_TOKEN) || undefined; // Get merged config for this account
const password = clean(matrix.password) || clean(env.MATRIX_PASSWORD) || undefined; const merged = mergeMatrixAccountConfig(cfg, normalizedAccountId);
const deviceName = clean(matrix.deviceName) || clean(env.MATRIX_DEVICE_NAME) || undefined;
// For default account, allow env var fallbacks
const homeserver = clean(merged.homeserver) || (isDefaultAccount ? clean(env.MATRIX_HOMESERVER) : "");
const userId = clean(merged.userId) || (isDefaultAccount ? clean(env.MATRIX_USER_ID) : "");
const accessToken = clean(merged.accessToken) || (isDefaultAccount ? clean(env.MATRIX_ACCESS_TOKEN) : "") || undefined;
const password = clean(merged.password) || (isDefaultAccount ? clean(env.MATRIX_PASSWORD) : "") || undefined;
const deviceName = clean(merged.deviceName) || (isDefaultAccount ? clean(env.MATRIX_DEVICE_NAME) : "") || undefined;
const initialSyncLimit = const initialSyncLimit =
typeof matrix.initialSyncLimit === "number" typeof merged.initialSyncLimit === "number"
? Math.max(0, Math.floor(matrix.initialSyncLimit)) ? Math.max(0, Math.floor(merged.initialSyncLimit))
: undefined; : undefined;
const encryption = matrix.encryption ?? false; const encryption = merged.encryption ?? false;
return { return {
homeserver, homeserver,
userId, userId,
@ -37,22 +79,30 @@ export function resolveMatrixConfig(
export async function resolveMatrixAuth(params?: { export async function resolveMatrixAuth(params?: {
cfg?: CoreConfig; cfg?: CoreConfig;
env?: NodeJS.ProcessEnv; env?: NodeJS.ProcessEnv;
accountId?: string;
}): Promise<MatrixAuth> { }): Promise<MatrixAuth> {
const cfg = params?.cfg ?? (getMatrixRuntime().config.loadConfig() as CoreConfig); const cfg = params?.cfg ?? (getMatrixRuntime().config.loadConfig() as CoreConfig);
const env = params?.env ?? process.env; const env = params?.env ?? process.env;
const resolved = resolveMatrixConfig(cfg, env); const accountId = params?.accountId;
const resolved = resolveMatrixConfig(cfg, env, accountId);
if (!resolved.homeserver) { if (!resolved.homeserver) {
throw new Error("Matrix homeserver is required (matrix.homeserver)"); throw new Error(`Matrix homeserver is required for account ${accountId ?? "default"} (matrix.homeserver)`);
} }
const normalizedAccountId = normalizeAccountId(accountId);
const isDefaultAccount = normalizedAccountId === DEFAULT_ACCOUNT_ID || normalizedAccountId === "default";
// Only use cached credentials for default account
// Use serialized import to prevent race conditions during parallel account startup
const { const {
loadMatrixCredentials, loadMatrixCredentials,
saveMatrixCredentials, saveMatrixCredentials,
credentialsMatchConfig, credentialsMatchConfig,
touchMatrixCredentials, touchMatrixCredentials,
} = await import("../credentials.js"); } = await importCredentials();
const cached = loadMatrixCredentials(env); const cached = isDefaultAccount ? loadMatrixCredentials(env) : null;
const cachedCredentials = const cachedCredentials =
cached && cached &&
credentialsMatchConfig(cached, { credentialsMatchConfig(cached, {
@ -71,13 +121,15 @@ export async function resolveMatrixAuth(params?: {
const tempClient = new MatrixClient(resolved.homeserver, resolved.accessToken); const tempClient = new MatrixClient(resolved.homeserver, resolved.accessToken);
const whoami = await tempClient.getUserId(); const whoami = await tempClient.getUserId();
userId = whoami; userId = whoami;
// Save the credentials with the fetched userId // Only save credentials for default account
saveMatrixCredentials({ if (isDefaultAccount) {
homeserver: resolved.homeserver, saveMatrixCredentials({
userId, homeserver: resolved.homeserver,
accessToken: resolved.accessToken, userId,
}); accessToken: resolved.accessToken,
} else if (cachedCredentials && cachedCredentials.accessToken === resolved.accessToken) { });
}
} else if (isDefaultAccount && cachedCredentials && cachedCredentials.accessToken === resolved.accessToken) {
touchMatrixCredentials(env); touchMatrixCredentials(env);
} }
return { return {
@ -90,7 +142,8 @@ export async function resolveMatrixAuth(params?: {
}; };
} }
if (cachedCredentials) { // Try cached credentials (only for default account)
if (isDefaultAccount && cachedCredentials) {
touchMatrixCredentials(env); touchMatrixCredentials(env);
return { return {
homeserver: cachedCredentials.homeserver, homeserver: cachedCredentials.homeserver,
@ -103,12 +156,14 @@ export async function resolveMatrixAuth(params?: {
} }
if (!resolved.userId) { if (!resolved.userId) {
throw new Error("Matrix userId is required when no access token is configured (matrix.userId)"); throw new Error(
`Matrix userId is required for account ${accountId ?? "default"} when no access token is configured`,
);
} }
if (!resolved.password) { if (!resolved.password) {
throw new Error( throw new Error(
"Matrix password is required when no access token is configured (matrix.password)", `Matrix password is required for account ${accountId ?? "default"} when no access token is configured`,
); );
} }
@ -126,7 +181,7 @@ export async function resolveMatrixAuth(params?: {
if (!loginResponse.ok) { if (!loginResponse.ok) {
const errorText = await loginResponse.text(); const errorText = await loginResponse.text();
throw new Error(`Matrix login failed: ${errorText}`); throw new Error(`Matrix login failed for account ${accountId ?? "default"}: ${errorText}`);
} }
const login = (await loginResponse.json()) as { const login = (await loginResponse.json()) as {
@ -137,7 +192,7 @@ export async function resolveMatrixAuth(params?: {
const accessToken = login.access_token?.trim(); const accessToken = login.access_token?.trim();
if (!accessToken) { if (!accessToken) {
throw new Error("Matrix login did not return an access token"); throw new Error(`Matrix login did not return an access token for account ${accountId ?? "default"}`);
} }
const auth: MatrixAuth = { const auth: MatrixAuth = {
@ -149,12 +204,15 @@ export async function resolveMatrixAuth(params?: {
encryption: resolved.encryption, encryption: resolved.encryption,
}; };
saveMatrixCredentials({ // Only save credentials for default account
homeserver: auth.homeserver, if (isDefaultAccount) {
userId: auth.userId, saveMatrixCredentials({
accessToken: auth.accessToken, homeserver: auth.homeserver,
deviceId: login.device_id, userId: auth.userId,
}); accessToken: auth.accessToken,
deviceId: login.device_id,
});
}
return auth; return auth;
} }

View file

@ -1,11 +1,15 @@
import type { IStorageProvider, ICryptoStorageProvider } from "@vector-im/matrix-bot-sdk"; import fs from "node:fs";
import { import {
LogService, LogService,
MatrixClient, MatrixClient,
SimpleFsStorageProvider, SimpleFsStorageProvider,
RustSdkCryptoStorageProvider, RustSdkCryptoStorageProvider,
} from "@vector-im/matrix-bot-sdk"; } from "@vector-im/matrix-bot-sdk";
import fs from "node:fs";
import { importCryptoNodejs } from "../import-mutex.js";
import type { IStorageProvider, ICryptoStorageProvider } from "@vector-im/matrix-bot-sdk";
import { ensureMatrixSdkLoggingConfigured } from "./logging.js"; import { ensureMatrixSdkLoggingConfigured } from "./logging.js";
import { import {
maybeMigrateLegacyStorage, maybeMigrateLegacyStorage,
@ -14,9 +18,7 @@ import {
} from "./storage.js"; } from "./storage.js";
function sanitizeUserIdList(input: unknown, label: string): string[] { function sanitizeUserIdList(input: unknown, label: string): string[] {
if (input == null) { if (input == null) return [];
return [];
}
if (!Array.isArray(input)) { if (!Array.isArray(input)) {
LogService.warn( LogService.warn(
"MatrixClientLite", "MatrixClientLite",
@ -65,14 +67,14 @@ export async function createMatrixClient(params: {
fs.mkdirSync(storagePaths.cryptoPath, { recursive: true }); fs.mkdirSync(storagePaths.cryptoPath, { recursive: true });
try { try {
const { StoreType } = await import("@matrix-org/matrix-sdk-crypto-nodejs"); // Use serialized import to prevent race conditions with native Rust module
cryptoStorage = new RustSdkCryptoStorageProvider(storagePaths.cryptoPath, StoreType.Sqlite); const { StoreType } = await importCryptoNodejs();
} catch (err) { cryptoStorage = new RustSdkCryptoStorageProvider(
LogService.warn( storagePaths.cryptoPath,
"MatrixClientLite", StoreType.Sqlite,
"Failed to initialize crypto storage, E2EE disabled:",
err,
); );
} catch (err) {
LogService.warn("MatrixClientLite", "Failed to initialize crypto storage, E2EE disabled:", err);
} }
} }
@ -83,7 +85,12 @@ export async function createMatrixClient(params: {
accountId: params.accountId, accountId: params.accountId,
}); });
const client = new MatrixClient(params.homeserver, params.accessToken, storage, cryptoStorage); const client = new MatrixClient(
params.homeserver,
params.accessToken,
storage,
cryptoStorage,
);
if (client.crypto) { if (client.crypto) {
const originalUpdateSyncData = client.crypto.updateSyncData.bind(client.crypto); const originalUpdateSyncData = client.crypto.updateSyncData.bind(client.crypto);

View file

@ -1,10 +1,11 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
import { LogService } from "@vector-im/matrix-bot-sdk"; import { LogService } from "@vector-im/matrix-bot-sdk";
import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
import type { CoreConfig } from "../types.js"; import type { CoreConfig } from "../types.js";
import type { MatrixAuth } from "./types.js";
import { resolveMatrixAuth } from "./config.js";
import { createMatrixClient } from "./create-client.js"; import { createMatrixClient } from "./create-client.js";
import { resolveMatrixAuth } from "./config.js";
import { DEFAULT_ACCOUNT_KEY } from "./storage.js"; import { DEFAULT_ACCOUNT_KEY } from "./storage.js";
import type { MatrixAuth } from "./types.js";
type SharedMatrixClientState = { type SharedMatrixClientState = {
client: MatrixClient; client: MatrixClient;
@ -13,6 +14,12 @@ type SharedMatrixClientState = {
cryptoReady: boolean; cryptoReady: boolean;
}; };
// Multi-account support: Map of accountKey -> client state
const sharedClients = new Map<string, SharedMatrixClientState>();
const sharedClientPromises = new Map<string, Promise<SharedMatrixClientState>>();
const sharedClientStartPromises = new Map<string, Promise<void>>();
// Legacy single-client references (for backwards compatibility)
let sharedClientState: SharedMatrixClientState | null = null; let sharedClientState: SharedMatrixClientState | null = null;
let sharedClientPromise: Promise<SharedMatrixClientState> | null = null; let sharedClientPromise: Promise<SharedMatrixClientState> | null = null;
let sharedClientStartPromise: Promise<void> | null = null; let sharedClientStartPromise: Promise<void> | null = null;
@ -27,10 +34,14 @@ function buildSharedClientKey(auth: MatrixAuth, accountId?: string | null): stri
].join("|"); ].join("|");
} }
function getAccountKey(accountId?: string | null): string {
return accountId ?? DEFAULT_ACCOUNT_KEY;
}
async function createSharedMatrixClient(params: { async function createSharedMatrixClient(params: {
auth: MatrixAuth; auth: MatrixAuth;
timeoutMs?: number; timeoutMs?: number;
accountId?: string | null; accountId?: string;
}): Promise<SharedMatrixClientState> { }): Promise<SharedMatrixClientState> {
const client = await createMatrixClient({ const client = await createMatrixClient({
homeserver: params.auth.homeserver, homeserver: params.auth.homeserver,
@ -53,15 +64,24 @@ async function ensureSharedClientStarted(params: {
timeoutMs?: number; timeoutMs?: number;
initialSyncLimit?: number; initialSyncLimit?: number;
encryption?: boolean; encryption?: boolean;
accountId?: string | null;
}): Promise<void> { }): Promise<void> {
if (params.state.started) { if (params.state.started) return;
const accountKey = getAccountKey(params.accountId);
const existingPromise = sharedClientStartPromises.get(accountKey);
if (existingPromise) {
await existingPromise;
return; return;
} }
if (sharedClientStartPromise) {
// Legacy compatibility
if (sharedClientStartPromise && !params.accountId) {
await sharedClientStartPromise; await sharedClientStartPromise;
return; return;
} }
sharedClientStartPromise = (async () => {
const startPromise = (async () => {
const client = params.state.client; const client = params.state.client;
// Initialize crypto if enabled // Initialize crypto if enabled
@ -80,10 +100,19 @@ async function ensureSharedClientStarted(params: {
await client.start(); await client.start();
params.state.started = true; params.state.started = true;
})(); })();
sharedClientStartPromises.set(accountKey, startPromise);
if (!params.accountId) {
sharedClientStartPromise = startPromise;
}
try { try {
await sharedClientStartPromise; await startPromise;
} finally { } finally {
sharedClientStartPromise = null; sharedClientStartPromises.delete(accountKey);
if (!params.accountId) {
sharedClientStartPromise = null;
}
} }
} }
@ -97,23 +126,67 @@ export async function resolveSharedMatrixClient(
accountId?: string | null; accountId?: string | null;
} = {}, } = {},
): Promise<MatrixClient> { ): Promise<MatrixClient> {
const auth = params.auth ?? (await resolveMatrixAuth({ cfg: params.cfg, env: params.env })); const auth = params.auth ?? (await resolveMatrixAuth({ cfg: params.cfg, env: params.env, accountId: params.accountId ?? undefined }));
const key = buildSharedClientKey(auth, params.accountId); const key = buildSharedClientKey(auth, params.accountId);
const accountKey = getAccountKey(params.accountId);
const shouldStart = params.startClient !== false; const shouldStart = params.startClient !== false;
if (sharedClientState?.key === key) { // Check if we already have this client in the multi-account map
const existingClient = sharedClients.get(accountKey);
if (existingClient?.key === key) {
if (shouldStart) {
await ensureSharedClientStarted({
state: existingClient,
timeoutMs: params.timeoutMs,
initialSyncLimit: auth.initialSyncLimit,
encryption: auth.encryption,
accountId: params.accountId,
});
}
// Update legacy reference for default account
if (!params.accountId || params.accountId === DEFAULT_ACCOUNT_KEY) {
sharedClientState = existingClient;
}
return existingClient.client;
}
// Legacy compatibility: check old single-client state
if (!params.accountId && sharedClientState?.key === key) {
if (shouldStart) { if (shouldStart) {
await ensureSharedClientStarted({ await ensureSharedClientStarted({
state: sharedClientState, state: sharedClientState,
timeoutMs: params.timeoutMs, timeoutMs: params.timeoutMs,
initialSyncLimit: auth.initialSyncLimit, initialSyncLimit: auth.initialSyncLimit,
encryption: auth.encryption, encryption: auth.encryption,
accountId: params.accountId,
}); });
} }
return sharedClientState.client; return sharedClientState.client;
} }
if (sharedClientPromise) { // Check for pending creation promise for this account
const pendingPromise = sharedClientPromises.get(accountKey);
if (pendingPromise) {
const pending = await pendingPromise;
if (pending.key === key) {
if (shouldStart) {
await ensureSharedClientStarted({
state: pending,
timeoutMs: params.timeoutMs,
initialSyncLimit: auth.initialSyncLimit,
encryption: auth.encryption,
accountId: params.accountId,
});
}
return pending.client;
}
// Key mismatch - stop old client
pending.client.stop();
sharedClients.delete(accountKey);
}
// Legacy: check old single-client promise
if (!params.accountId && sharedClientPromise) {
const pending = await sharedClientPromise; const pending = await sharedClientPromise;
if (pending.key === key) { if (pending.key === key) {
if (shouldStart) { if (shouldStart) {
@ -122,6 +195,7 @@ export async function resolveSharedMatrixClient(
timeoutMs: params.timeoutMs, timeoutMs: params.timeoutMs,
initialSyncLimit: auth.initialSyncLimit, initialSyncLimit: auth.initialSyncLimit,
encryption: auth.encryption, encryption: auth.encryption,
accountId: params.accountId,
}); });
} }
return pending.client; return pending.client;
@ -131,25 +205,39 @@ export async function resolveSharedMatrixClient(
sharedClientPromise = null; sharedClientPromise = null;
} }
sharedClientPromise = createSharedMatrixClient({ // Create new client
const createPromise = createSharedMatrixClient({
auth, auth,
timeoutMs: params.timeoutMs, timeoutMs: params.timeoutMs,
accountId: params.accountId, accountId: params.accountId ?? undefined,
}); });
sharedClientPromises.set(accountKey, createPromise);
if (!params.accountId || params.accountId === DEFAULT_ACCOUNT_KEY) {
sharedClientPromise = createPromise;
}
try { try {
const created = await sharedClientPromise; const created = await createPromise;
sharedClientState = created; sharedClients.set(accountKey, created);
if (!params.accountId || params.accountId === DEFAULT_ACCOUNT_KEY) {
sharedClientState = created;
}
if (shouldStart) { if (shouldStart) {
await ensureSharedClientStarted({ await ensureSharedClientStarted({
state: created, state: created,
timeoutMs: params.timeoutMs, timeoutMs: params.timeoutMs,
initialSyncLimit: auth.initialSyncLimit, initialSyncLimit: auth.initialSyncLimit,
encryption: auth.encryption, encryption: auth.encryption,
accountId: params.accountId,
}); });
} }
return created.client; return created.client;
} finally { } finally {
sharedClientPromise = null; sharedClientPromises.delete(accountKey);
if (!params.accountId || params.accountId === DEFAULT_ACCOUNT_KEY) {
sharedClientPromise = null;
}
} }
} }
@ -162,9 +250,28 @@ export async function waitForMatrixSync(_params: {
// This is kept for API compatibility but is essentially a no-op now // This is kept for API compatibility but is essentially a no-op now
} }
export function stopSharedClient(): void { export function stopSharedClient(accountId?: string | null): void {
if (sharedClientState) { if (accountId) {
sharedClientState.client.stop(); // Stop specific account
sharedClientState = null; const accountKey = getAccountKey(accountId);
const client = sharedClients.get(accountKey);
if (client) {
client.client.stop();
sharedClients.delete(accountKey);
}
// Also clear legacy reference if it matches
if (sharedClientState?.key === client?.key) {
sharedClientState = null;
}
} else {
// Stop all clients (legacy behavior + all multi-account clients)
for (const [key, client] of sharedClients) {
client.client.stop();
sharedClients.delete(key);
}
if (sharedClientState) {
sharedClientState.client.stop();
sharedClientState = null;
}
} }
} }

View file

@ -0,0 +1,88 @@
/**
* Import Mutex - Serializes dynamic imports to prevent race conditions
*
* Problem: When multiple Matrix accounts start in parallel, they all call
* dynamic imports simultaneously. Native modules (like @matrix-org/matrix-sdk-crypto-nodejs)
* can crash when loaded in parallel from multiple promises.
*
* Solution: Cache the import promise so that concurrent callers await the same promise
* instead of triggering parallel imports.
*/
// Cache for import promises - key is module specifier
const importCache = new Map<string, Promise<unknown>>();
/**
* Safely import a module with deduplication.
* If an import is already in progress, returns the existing promise.
* Once resolved, the result is cached for future calls.
*/
export async function serializedImport<T>(
moduleSpecifier: string,
importFn: () => Promise<T>
): Promise<T> {
const existing = importCache.get(moduleSpecifier);
if (existing) {
return existing as Promise<T>;
}
const importPromise = importFn().catch((err) => {
// On failure, remove from cache to allow retry
importCache.delete(moduleSpecifier);
throw err;
});
importCache.set(moduleSpecifier, importPromise);
return importPromise;
}
// Pre-cached imports for critical modules
let cryptoNodejsModule: typeof import("@matrix-org/matrix-sdk-crypto-nodejs") | null = null;
let credentialsModule: typeof import("./credentials.js") | null = null;
/**
* Safely import the crypto-nodejs module (Rust native).
* This is the most critical one - parallel imports of native modules crash.
*/
export async function importCryptoNodejs(): Promise<typeof import("@matrix-org/matrix-sdk-crypto-nodejs")> {
if (cryptoNodejsModule) return cryptoNodejsModule;
const mod = await serializedImport(
"@matrix-org/matrix-sdk-crypto-nodejs",
() => import("@matrix-org/matrix-sdk-crypto-nodejs")
);
cryptoNodejsModule = mod;
return mod;
}
/**
* Safely import the credentials module.
*/
export async function importCredentials(): Promise<typeof import("./credentials.js")> {
if (credentialsModule) return credentialsModule;
const mod = await serializedImport(
"../credentials.js",
() => import("./credentials.js")
);
credentialsModule = mod;
return mod;
}
// Pre-cached import for matrix index module
let matrixIndexModule: typeof import("./index.js") | null = null;
/**
* Safely import the matrix/index.js module.
* This is called from channel.ts during parallel account startup.
*/
export async function importMatrixIndex(): Promise<typeof import("./index.js")> {
if (matrixIndexModule) return matrixIndexModule;
const mod = await serializedImport(
"./matrix/index.js",
() => import("./index.js")
);
matrixIndexModule = mod;
return mod;
}

View file

@ -0,0 +1,14 @@
import fs from "node:fs";
import path from "node:path";
const DEBUG_LOG_PATH = "/home/keller/clawd/agents/mondo-assistant/debug-matrix.log";
export function debugLog(message: string): void {
const timestamp = new Date().toISOString();
const line = `[${timestamp}] ${message}\n`;
try {
fs.appendFileSync(DEBUG_LOG_PATH, line);
} catch {
// Ignore errors
}
}

View file

@ -1,5 +1,6 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
import type { PluginRuntime } from "openclaw/plugin-sdk"; import type { PluginRuntime } from "openclaw/plugin-sdk";
import type { MatrixAuth } from "../client.js"; import type { MatrixAuth } from "../client.js";
import type { MatrixRawEvent } from "./types.js"; import type { MatrixRawEvent } from "./types.js";
import { EventType } from "./types.js"; import { EventType } from "./types.js";
@ -25,7 +26,41 @@ export function registerMatrixMonitorEvents(params: {
onRoomMessage, onRoomMessage,
} = params; } = params;
client.on("room.message", onRoomMessage); // Track processed event IDs to avoid double-processing from room.message + room.decrypted_event
const processedEvents = new Set<string>();
const PROCESSED_EVENTS_MAX = 1000;
const deduplicatedHandler = async (roomId: string, event: MatrixRawEvent, source: string) => {
const eventId = event?.event_id;
if (!eventId) {
logVerboseMessage(`matrix: ${source} event has no id, processing anyway`);
await onRoomMessage(roomId, event);
return;
}
if (processedEvents.has(eventId)) {
logVerboseMessage(`matrix: ${source} skipping duplicate event id=${eventId}`);
return;
}
processedEvents.add(eventId);
// Prevent memory leak by clearing old entries
if (processedEvents.size > PROCESSED_EVENTS_MAX) {
const iterator = processedEvents.values();
for (let i = 0; i < 100; i++) {
const next = iterator.next();
if (next.done) break;
processedEvents.delete(next.value);
}
}
logVerboseMessage(`matrix: ${source} processing event id=${eventId} room=${roomId}`);
await onRoomMessage(roomId, event);
};
client.on("room.message", (roomId: string, event: MatrixRawEvent) => {
deduplicatedHandler(roomId, event, "room.message");
});
client.on("room.encrypted_event", (roomId: string, event: MatrixRawEvent) => { client.on("room.encrypted_event", (roomId: string, event: MatrixRawEvent) => {
const eventId = event?.event_id ?? "unknown"; const eventId = event?.event_id ?? "unknown";
@ -36,12 +71,21 @@ export function registerMatrixMonitorEvents(params: {
client.on("room.decrypted_event", (roomId: string, event: MatrixRawEvent) => { client.on("room.decrypted_event", (roomId: string, event: MatrixRawEvent) => {
const eventId = event?.event_id ?? "unknown"; const eventId = event?.event_id ?? "unknown";
const eventType = event?.type ?? "unknown"; const eventType = event?.type ?? "unknown";
const content = event?.content as Record<string, unknown> | undefined;
const hasFile = content && "file" in content;
const hasUrl = content && "url" in content;
// DEBUG: Always log decrypted events with file info
console.log(`[MATRIX-E2EE-DEBUG] decrypted_event room=${roomId} type=${eventType} id=${eventId} hasFile=${hasFile} hasUrl=${hasUrl}`);
logVerboseMessage(`matrix: decrypted event room=${roomId} type=${eventType} id=${eventId}`); logVerboseMessage(`matrix: decrypted event room=${roomId} type=${eventType} id=${eventId}`);
// Process decrypted messages through the deduplicated handler
deduplicatedHandler(roomId, event, "room.decrypted_event");
}); });
client.on( client.on(
"room.failed_decryption", "room.failed_decryption",
async (roomId: string, event: MatrixRawEvent, error: Error) => { async (roomId: string, event: MatrixRawEvent, error: Error) => {
// DEBUG: Always log failed decryption
console.log(`[MATRIX-E2EE-DEBUG] FAILED_DECRYPTION room=${roomId} id=${event.event_id ?? "unknown"} error=${error.message}`);
logger.warn( logger.warn(
{ roomId, eventId: event.event_id, error: error.message }, { roomId, eventId: event.event_id, error: error.message },
"Failed to decrypt message", "Failed to decrypt message",
@ -83,7 +127,8 @@ export function registerMatrixMonitorEvents(params: {
const hint = formatNativeDependencyHint({ const hint = formatNativeDependencyHint({
packageName: "@matrix-org/matrix-sdk-crypto-nodejs", packageName: "@matrix-org/matrix-sdk-crypto-nodejs",
manager: "pnpm", manager: "pnpm",
downloadCommand: "node node_modules/@matrix-org/matrix-sdk-crypto-nodejs/download-lib.js", downloadCommand:
"node node_modules/@matrix-org/matrix-sdk-crypto-nodejs/download-lib.js",
}); });
const warning = `matrix: encryption enabled but crypto is unavailable; ${hint}`; const warning = `matrix: encryption enabled but crypto is unavailable; ${hint}`;
logger.warn({ roomId }, warning); logger.warn({ roomId }, warning);

View file

@ -1,4 +1,14 @@
import type { LocationMessageEventContent, MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { LocationMessageEventContent, MatrixClient } from "@vector-im/matrix-bot-sdk";
import fs from "node:fs";
// File-based debug logging
const DEBUG_LOG = "/home/keller/clawd/agents/mondo-assistant/matrix-debug.log";
function debugWrite(msg: string) {
try {
fs.appendFileSync(DEBUG_LOG, `[${new Date().toISOString()}] ${msg}\n`);
} catch { /* ignore */ }
}
import { import {
createReplyPrefixContext, createReplyPrefixContext,
createTypingCallbacks, createTypingCallbacks,
@ -9,30 +19,25 @@ import {
type RuntimeEnv, type RuntimeEnv,
} from "openclaw/plugin-sdk"; } from "openclaw/plugin-sdk";
import type { CoreConfig, ReplyToMode } from "../../types.js"; import type { CoreConfig, ReplyToMode } from "../../types.js";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
import { import {
formatPollAsText, formatPollAsText,
isPollStartType, isPollStartType,
parsePollStartContent, parsePollStartContent,
type PollStartContent, type PollStartContent,
} from "../poll-types.js"; } from "../poll-types.js";
import { import { reactMatrixMessage, sendMessageMatrix, sendReadReceiptMatrix, sendTypingMatrix } from "../send.js";
reactMatrixMessage,
sendMessageMatrix,
sendReadReceiptMatrix,
sendTypingMatrix,
} from "../send.js";
import { import {
resolveMatrixAllowListMatch, resolveMatrixAllowListMatch,
resolveMatrixAllowListMatches, resolveMatrixAllowListMatches,
normalizeAllowListLower, normalizeAllowListLower,
} from "./allowlist.js"; } from "./allowlist.js";
import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js";
import { downloadMatrixMedia } from "./media.js"; import { downloadMatrixMedia } from "./media.js";
import { resolveMentions } from "./mentions.js"; import { resolveMentions } from "./mentions.js";
import { deliverMatrixReplies } from "./replies.js"; import { deliverMatrixReplies } from "./replies.js";
import { resolveMatrixRoomConfig } from "./rooms.js"; import { resolveMatrixRoomConfig } from "./rooms.js";
import { resolveMatrixThreadRootId, resolveMatrixThreadTarget } from "./threads.js"; import { resolveMatrixThreadRootId, resolveMatrixThreadTarget } from "./threads.js";
import { resolveMatrixLocation, type MatrixLocationPayload } from "./location.js";
import type { MatrixRawEvent, RoomMessageEventContent } from "./types.js";
import { EventType, RelationType } from "./types.js"; import { EventType, RelationType } from "./types.js";
export type MatrixMonitorHandlerParams = { export type MatrixMonitorHandlerParams = {
@ -41,7 +46,7 @@ export type MatrixMonitorHandlerParams = {
logging: { logging: {
shouldLogVerbose: () => boolean; shouldLogVerbose: () => boolean;
}; };
channel: (typeof import("openclaw/plugin-sdk"))["channel"]; channel: typeof import("openclaw/plugin-sdk")["channel"];
system: { system: {
enqueueSystemEvent: ( enqueueSystemEvent: (
text: string, text: string,
@ -63,7 +68,7 @@ export type MatrixMonitorHandlerParams = {
: Record<string, unknown> | undefined : Record<string, unknown> | undefined
: Record<string, unknown> | undefined; : Record<string, unknown> | undefined;
mentionRegexes: ReturnType< mentionRegexes: ReturnType<
(typeof import("openclaw/plugin-sdk"))["channel"]["mentions"]["buildMentionRegexes"] typeof import("openclaw/plugin-sdk")["channel"]["mentions"]["buildMentionRegexes"]
>; >;
groupPolicy: "open" | "allowlist" | "disabled"; groupPolicy: "open" | "allowlist" | "disabled";
replyToMode: ReplyToMode; replyToMode: ReplyToMode;
@ -81,10 +86,10 @@ export type MatrixMonitorHandlerParams = {
selfUserId: string; selfUserId: string;
}) => Promise<boolean>; }) => Promise<boolean>;
}; };
getRoomInfo: ( getRoomInfo: (roomId: string) => Promise<{ name?: string; canonicalAlias?: string; altAliases: string[] }>;
roomId: string,
) => Promise<{ name?: string; canonicalAlias?: string; altAliases: string[] }>;
getMemberDisplayName: (roomId: string, userId: string) => Promise<string>; getMemberDisplayName: (roomId: string, userId: string) => Promise<string>;
/** Account ID for multi-account routing */
accountId?: string | null;
}; };
export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParams) { export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParams) {
@ -110,12 +115,15 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
directTracker, directTracker,
getRoomInfo, getRoomInfo,
getMemberDisplayName, getMemberDisplayName,
accountId,
} = params; } = params;
return async (roomId: string, event: MatrixRawEvent) => { return async (roomId: string, event: MatrixRawEvent) => {
debugWrite(`HANDLER-START: room=${roomId} eventId=${event.event_id ?? "unknown"} type=${event.type} sender=${event.sender} accountId=${accountId ?? "default"}`);
try { try {
const eventType = event.type; const eventType = event.type;
if (eventType === EventType.RoomMessageEncrypted) { if (eventType === EventType.RoomMessageEncrypted) {
debugWrite(`HANDLER: SKIP encrypted event (should be auto-decrypted)`);
// Encrypted messages are decrypted automatically by @vector-im/matrix-bot-sdk with crypto enabled // Encrypted messages are decrypted automatically by @vector-im/matrix-bot-sdk with crypto enabled
return; return;
} }
@ -124,24 +132,17 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const locationContent = event.content as LocationMessageEventContent; const locationContent = event.content as LocationMessageEventContent;
const isLocationEvent = const isLocationEvent =
eventType === EventType.Location || eventType === EventType.Location ||
(eventType === EventType.RoomMessage && locationContent.msgtype === EventType.Location); (eventType === EventType.RoomMessage &&
if (eventType !== EventType.RoomMessage && !isPollEvent && !isLocationEvent) { locationContent.msgtype === EventType.Location);
return; if (eventType !== EventType.RoomMessage && !isPollEvent && !isLocationEvent) return;
}
logVerboseMessage( logVerboseMessage(
`matrix: room.message recv room=${roomId} type=${eventType} id=${event.event_id ?? "unknown"}`, `matrix: room.message recv room=${roomId} type=${eventType} id=${event.event_id ?? "unknown"}`,
); );
if (event.unsigned?.redacted_because) { if (event.unsigned?.redacted_because) return;
return;
}
const senderId = event.sender; const senderId = event.sender;
if (!senderId) { if (!senderId) return;
return;
}
const selfUserId = await client.getUserId(); const selfUserId = await client.getUserId();
if (senderId === selfUserId) { if (senderId === selfUserId) return;
return;
}
const eventTs = event.origin_server_ts; const eventTs = event.origin_server_ts;
const eventAge = event.unsigned?.age; const eventAge = event.unsigned?.age;
if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) { if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) {
@ -157,7 +158,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const roomInfo = await getRoomInfo(roomId); const roomInfo = await getRoomInfo(roomId);
const roomName = roomInfo.name; const roomName = roomInfo.name;
const roomAliases = [roomInfo.canonicalAlias ?? "", ...roomInfo.altAliases].filter(Boolean); const roomAliases = [
roomInfo.canonicalAlias ?? "",
...roomInfo.altAliases,
].filter(Boolean);
let content = event.content as RoomMessageEventContent; let content = event.content as RoomMessageEventContent;
if (isPollEvent) { if (isPollEvent) {
@ -186,9 +190,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const relates = content["m.relates_to"]; const relates = content["m.relates_to"];
if (relates && "rel_type" in relates) { if (relates && "rel_type" in relates) {
if (relates.rel_type === RelationType.Replace) { if (relates.rel_type === RelationType.Replace) return;
return;
}
} }
const isDirectMessage = await directTracker.isDirectMessage({ const isDirectMessage = await directTracker.isDirectMessage({
@ -198,9 +200,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}); });
const isRoom = !isDirectMessage; const isRoom = !isDirectMessage;
if (isRoom && groupPolicy === "disabled") { if (isRoom && groupPolicy === "disabled") return;
return;
}
const roomConfigInfo = isRoom const roomConfigInfo = isRoom
? resolveMatrixRoomConfig({ ? resolveMatrixRoomConfig({
@ -233,9 +233,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
} }
const senderName = await getMemberDisplayName(roomId, senderId); const senderName = await getMemberDisplayName(roomId, senderId);
const storeAllowFrom = await core.channel.pairing const storeAllowFrom = await core.channel.pairing.readAllowFromStore("matrix").catch(() => []);
.readAllowFromStore("matrix")
.catch(() => []);
const effectiveAllowFrom = normalizeAllowListLower([...allowFrom, ...storeAllowFrom]); const effectiveAllowFrom = normalizeAllowListLower([...allowFrom, ...storeAllowFrom]);
const groupAllowFrom = cfg.channels?.matrix?.groupAllowFrom ?? []; const groupAllowFrom = cfg.channels?.matrix?.groupAllowFrom ?? [];
const effectiveGroupAllowFrom = normalizeAllowListLower([ const effectiveGroupAllowFrom = normalizeAllowListLower([
@ -245,9 +243,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const groupAllowConfigured = effectiveGroupAllowFrom.length > 0; const groupAllowConfigured = effectiveGroupAllowFrom.length > 0;
if (isDirectMessage) { if (isDirectMessage) {
if (!dmEnabled || dmPolicy === "disabled") { if (!dmEnabled || dmPolicy === "disabled") return;
return;
}
if (dmPolicy !== "open") { if (dmPolicy !== "open") {
const allowMatch = resolveMatrixAllowListMatch({ const allowMatch = resolveMatrixAllowListMatch({
allowList: effectiveAllowFrom, allowList: effectiveAllowFrom,
@ -329,8 +325,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
logVerboseMessage(`matrix: allow room ${roomId} (${roomMatchMeta})`); logVerboseMessage(`matrix: allow room ${roomId} (${roomMatchMeta})`);
} }
const rawBody = const rawBody = locationPayload?.text
locationPayload?.text ?? (typeof content.body === "string" ? content.body.trim() : ""); ?? (typeof content.body === "string" ? content.body.trim() : "");
let media: { let media: {
path: string; path: string;
contentType?: string; contentType?: string;
@ -343,7 +339,14 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
? content.file ? content.file
: undefined; : undefined;
const mediaUrl = contentUrl ?? contentFile?.url; const mediaUrl = contentUrl ?? contentFile?.url;
// DEBUG: Log media detection
const msgtype = "msgtype" in content ? content.msgtype : undefined;
debugWrite(`HANDLER: room=${roomId} sender=${senderId} msgtype=${msgtype} contentUrl=${contentUrl ?? "none"} mediaUrl=${mediaUrl ?? "none"} accountId=${accountId ?? "default"}`);
logVerboseMessage(`matrix: content check msgtype=${msgtype} contentUrl=${contentUrl ?? "none"} mediaUrl=${mediaUrl ?? "none"} rawBody="${rawBody.slice(0,50)}"`);
if (!rawBody && !mediaUrl) { if (!rawBody && !mediaUrl) {
debugWrite(`HANDLER: SKIP - no rawBody and no mediaUrl`);
return; return;
} }
@ -352,8 +355,11 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
? (content.info as { mimetype?: string; size?: number }) ? (content.info as { mimetype?: string; size?: number })
: undefined; : undefined;
const contentType = contentInfo?.mimetype; const contentType = contentInfo?.mimetype;
const contentSize = typeof contentInfo?.size === "number" ? contentInfo.size : undefined; const contentSize =
typeof contentInfo?.size === "number" ? contentInfo.size : undefined;
if (mediaUrl?.startsWith("mxc://")) { if (mediaUrl?.startsWith("mxc://")) {
debugWrite(`HANDLER: attempting media download url=${mediaUrl} size=${contentSize ?? "unknown"} maxBytes=${mediaMaxBytes}`);
logVerboseMessage(`matrix: attempting media download url=${mediaUrl} size=${contentSize ?? "unknown"} maxBytes=${mediaMaxBytes}`);
try { try {
media = await downloadMatrixMedia({ media = await downloadMatrixMedia({
client, client,
@ -363,15 +369,19 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
maxBytes: mediaMaxBytes, maxBytes: mediaMaxBytes,
file: contentFile, file: contentFile,
}); });
debugWrite(`HANDLER: media download SUCCESS path=${media?.path ?? "none"}`);
logVerboseMessage(`matrix: media download success path=${media?.path ?? "none"}`);
} catch (err) { } catch (err) {
debugWrite(`HANDLER: media download FAILED: ${String(err)}`);
logVerboseMessage(`matrix: media download failed: ${String(err)}`); logVerboseMessage(`matrix: media download failed: ${String(err)}`);
} }
} else if (mediaUrl) {
debugWrite(`HANDLER: skipping non-mxc url=${mediaUrl}`);
logVerboseMessage(`matrix: skipping non-mxc media url=${mediaUrl}`);
} }
const bodyText = rawBody || media?.placeholder || ""; const bodyText = rawBody || media?.placeholder || "";
if (!bodyText) { if (!bodyText) return;
return;
}
const { wasMentioned, hasExplicitMention } = resolveMentions({ const { wasMentioned, hasExplicitMention } = resolveMentions({
content, content,
@ -461,6 +471,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const route = core.channel.routing.resolveAgentRoute({ const route = core.channel.routing.resolveAgentRoute({
cfg, cfg,
channel: "matrix", channel: "matrix",
accountId: accountId ?? undefined,
peer: { peer: {
kind: isDirectMessage ? "dm" : "channel", kind: isDirectMessage ? "dm" : "channel",
id: isDirectMessage ? senderId : roomId, id: isDirectMessage ? senderId : roomId,
@ -512,7 +523,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
MediaPath: media?.path, MediaPath: media?.path,
MediaType: media?.contentType, MediaType: media?.contentType,
MediaUrl: media?.path, MediaUrl: media?.path,
...locationPayload?.context, ...(locationPayload?.context ?? {}),
CommandAuthorized: commandAuthorized, CommandAuthorized: commandAuthorized,
CommandSource: "text" as const, CommandSource: "text" as const,
OriginatingChannel: "matrix" as const, OriginatingChannel: "matrix" as const,
@ -533,11 +544,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
: undefined, : undefined,
onRecordError: (err) => { onRecordError: (err) => {
logger.warn( logger.warn(
{ { error: String(err), storePath, sessionKey: ctxPayload.SessionKey ?? route.sessionKey },
error: String(err),
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
},
"failed updating session meta", "failed updating session meta",
); );
}, },
@ -551,19 +558,19 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const shouldAckReaction = () => const shouldAckReaction = () =>
Boolean( Boolean(
ackReaction && ackReaction &&
core.channel.reactions.shouldAckReaction({ core.channel.reactions.shouldAckReaction({
scope: ackScope, scope: ackScope,
isDirect: isDirectMessage, isDirect: isDirectMessage,
isGroup: isRoom, isGroup: isRoom,
isMentionableGroup: isRoom, isMentionableGroup: isRoom,
requireMention: Boolean(shouldRequireMention), requireMention: Boolean(shouldRequireMention),
canDetectMention, canDetectMention,
effectiveWasMentioned: wasMentioned || shouldBypassMention, effectiveWasMentioned: wasMentioned || shouldBypassMention,
shouldBypassMention, shouldBypassMention,
}), }),
); );
if (shouldAckReaction() && messageId) { if (shouldAckReaction() && messageId) {
reactMatrixMessage(roomId, messageId, ackReaction, client).catch((err) => { reactMatrixMessage(roomId, messageId, ackReaction, { client }).catch((err) => {
logVerboseMessage(`matrix react failed for room ${roomId}: ${String(err)}`); logVerboseMessage(`matrix react failed for room ${roomId}: ${String(err)}`);
}); });
} }
@ -648,9 +655,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
}, },
}); });
markDispatchIdle(); markDispatchIdle();
if (!queuedFinal) { if (!queuedFinal) return;
return;
}
didSendReply = true; didSendReply = true;
const finalCount = counts.final; const finalCount = counts.final;
logVerboseMessage( logVerboseMessage(

View file

@ -1,8 +1,11 @@
import { format } from "node:util"; import { format } from "node:util";
import { mergeAllowlist, summarizeMapping, type RuntimeEnv } from "openclaw/plugin-sdk";
import {
mergeAllowlist,
summarizeMapping,
type RuntimeEnv,
} from "openclaw/plugin-sdk";
import type { CoreConfig, ReplyToMode } from "../../types.js"; import type { CoreConfig, ReplyToMode } from "../../types.js";
import { resolveMatrixTargets } from "../../resolve-targets.js";
import { getMatrixRuntime } from "../../runtime.js";
import { setActiveMatrixClient } from "../active-client.js"; import { setActiveMatrixClient } from "../active-client.js";
import { import {
isBunRuntime, isBunRuntime,
@ -15,6 +18,8 @@ import { createDirectRoomTracker } from "./direct.js";
import { registerMatrixMonitorEvents } from "./events.js"; import { registerMatrixMonitorEvents } from "./events.js";
import { createMatrixRoomMessageHandler } from "./handler.js"; import { createMatrixRoomMessageHandler } from "./handler.js";
import { createMatrixRoomInfoResolver } from "./room-info.js"; import { createMatrixRoomInfoResolver } from "./room-info.js";
import { resolveMatrixTargets } from "../../resolve-targets.js";
import { getMatrixRuntime } from "../../runtime.js";
export type MonitorMatrixOpts = { export type MonitorMatrixOpts = {
runtime?: RuntimeEnv; runtime?: RuntimeEnv;
@ -33,9 +38,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
} }
const core = getMatrixRuntime(); const core = getMatrixRuntime();
let cfg = core.config.loadConfig() as CoreConfig; let cfg = core.config.loadConfig() as CoreConfig;
if (cfg.channels?.matrix?.enabled === false) { if (cfg.channels?.matrix?.enabled === false) return;
return;
}
const logger = core.logging.getChildLogger({ module: "matrix-auto-reply" }); const logger = core.logging.getChildLogger({ module: "matrix-auto-reply" });
const formatRuntimeMessage = (...args: Parameters<RuntimeEnv["log"]>) => format(...args); const formatRuntimeMessage = (...args: Parameters<RuntimeEnv["log"]>) => format(...args);
@ -51,22 +54,14 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
}, },
}; };
const logVerboseMessage = (message: string) => { const logVerboseMessage = (message: string) => {
if (!core.logging.shouldLogVerbose()) { if (!core.logging.shouldLogVerbose()) return;
return;
}
logger.debug(message); logger.debug(message);
}; };
const normalizeUserEntry = (raw: string) => const normalizeUserEntry = (raw: string) =>
raw raw.replace(/^matrix:/i, "").replace(/^user:/i, "").trim();
.replace(/^matrix:/i, "")
.replace(/^user:/i, "")
.trim();
const normalizeRoomEntry = (raw: string) => const normalizeRoomEntry = (raw: string) =>
raw raw.replace(/^matrix:/i, "").replace(/^(room|channel):/i, "").trim();
.replace(/^matrix:/i, "")
.replace(/^(room|channel):/i, "")
.trim();
const isMatrixUserId = (value: string) => value.startsWith("@") && value.includes(":"); const isMatrixUserId = (value: string) => value.startsWith("@") && value.includes(":");
const allowlistOnly = cfg.channels?.matrix?.allowlistOnly === true; const allowlistOnly = cfg.channels?.matrix?.allowlistOnly === true;
@ -118,9 +113,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
const pending: Array<{ input: string; query: string }> = []; const pending: Array<{ input: string; query: string }> = [];
for (const entry of entries) { for (const entry of entries) {
const trimmed = entry.trim(); const trimmed = entry.trim();
if (!trimmed) { if (!trimmed) continue;
continue;
}
const cleaned = normalizeRoomEntry(trimmed); const cleaned = normalizeRoomEntry(trimmed);
if (cleaned.startsWith("!") && cleaned.includes(":")) { if (cleaned.startsWith("!") && cleaned.includes(":")) {
if (!nextRooms[cleaned]) { if (!nextRooms[cleaned]) {
@ -140,9 +133,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
}); });
resolved.forEach((entry, index) => { resolved.forEach((entry, index) => {
const source = pending[index]; const source = pending[index];
if (!source) { if (!source) return;
return;
}
if (entry.resolved && entry.id) { if (entry.resolved && entry.id) {
if (!nextRooms[entry.id]) { if (!nextRooms[entry.id]) {
nextRooms[entry.id] = roomsConfig[source.input]; nextRooms[entry.id] = roomsConfig[source.input];
@ -172,7 +163,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
}, },
}; };
const auth = await resolveMatrixAuth({ cfg }); const auth = await resolveMatrixAuth({ cfg, accountId: opts.accountId });
const resolvedInitialSyncLimit = const resolvedInitialSyncLimit =
typeof opts.initialSyncLimit === "number" typeof opts.initialSyncLimit === "number"
? Math.max(0, Math.floor(opts.initialSyncLimit)) ? Math.max(0, Math.floor(opts.initialSyncLimit))
@ -187,7 +178,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
startClient: false, startClient: false,
accountId: opts.accountId, accountId: opts.accountId,
}); });
setActiveMatrixClient(client); setActiveMatrixClient(client, opts.accountId);
const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg); const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg);
const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy; const defaultGroupPolicy = cfg.channels?.defaults?.groupPolicy;
@ -232,6 +223,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
directTracker, directTracker,
getRoomInfo, getRoomInfo,
getMemberDisplayName, getMemberDisplayName,
accountId: opts.accountId,
}); });
registerMatrixMonitorEvents({ registerMatrixMonitorEvents({
@ -265,20 +257,17 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
logger.info("matrix: device verification requested - please verify in another client"); logger.info("matrix: device verification requested - please verify in another client");
} }
} catch (err) { } catch (err) {
logger.debug( logger.debug({ error: String(err) }, "Device verification request failed (may already be verified)");
{ error: String(err) },
"Device verification request failed (may already be verified)",
);
} }
} }
await new Promise<void>((resolve) => { await new Promise<void>((resolve) => {
const onAbort = () => { const onAbort = () => {
try { try {
logVerboseMessage("matrix: stopping client"); logVerboseMessage(`matrix: stopping client for account ${opts.accountId ?? "default"}`);
stopSharedClient(); stopSharedClient(opts.accountId);
} finally { } finally {
setActiveMatrixClient(null); setActiveMatrixClient(null, opts.accountId);
resolve(); resolve();
} }
}; };

View file

@ -1,4 +1,5 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
import { getMatrixRuntime } from "../../runtime.js"; import { getMatrixRuntime } from "../../runtime.js";
// Type for encrypted file info // Type for encrypted file info
@ -23,19 +24,21 @@ async function fetchMatrixMediaBuffer(params: {
}): Promise<{ buffer: Buffer; headerType?: string } | null> { }): Promise<{ buffer: Buffer; headerType?: string } | null> {
// @vector-im/matrix-bot-sdk provides mxcToHttp helper // @vector-im/matrix-bot-sdk provides mxcToHttp helper
const url = params.client.mxcToHttp(params.mxcUrl); const url = params.client.mxcToHttp(params.mxcUrl);
if (!url) { if (!url) return null;
return null;
}
// Use the client's download method which handles auth // Use the client's download method which handles auth
try { try {
const buffer = await params.client.downloadContent(params.mxcUrl); // downloadContent returns {data: Buffer, contentType: string}
const response = await params.client.downloadContent(params.mxcUrl);
const buffer = response.data;
const contentType = response.contentType;
if (buffer.byteLength > params.maxBytes) { if (buffer.byteLength > params.maxBytes) {
throw new Error("Matrix media exceeds configured size limit"); throw new Error("Matrix media exceeds configured size limit");
} }
return { buffer: Buffer.from(buffer) }; return { buffer: Buffer.from(buffer), headerType: contentType };
} catch (err) { } catch (err) {
throw new Error(`Matrix media download failed: ${String(err)}`, { cause: err }); throw new Error(`Matrix media download failed: ${String(err)}`);
} }
} }
@ -75,7 +78,10 @@ export async function downloadMatrixMedia(params: {
placeholder: string; placeholder: string;
} | null> { } | null> {
let fetched: { buffer: Buffer; headerType?: string } | null; let fetched: { buffer: Buffer; headerType?: string } | null;
if (typeof params.sizeBytes === "number" && params.sizeBytes > params.maxBytes) { if (
typeof params.sizeBytes === "number" &&
params.sizeBytes > params.maxBytes
) {
throw new Error("Matrix media exceeds configured size limit"); throw new Error("Matrix media exceeds configured size limit");
} }
@ -95,9 +101,7 @@ export async function downloadMatrixMedia(params: {
}); });
} }
if (!fetched) { if (!fetched) return null;
return null;
}
const headerType = fetched.headerType ?? params.contentType ?? undefined; const headerType = fetched.headerType ?? params.contentType ?? undefined;
const saved = await getMatrixRuntime().channel.media.saveMediaBuffer( const saved = await getMatrixRuntime().channel.media.saveMediaBuffer(
fetched.buffer, fetched.buffer,

View file

@ -1,4 +1,5 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
import type { PollInput } from "openclaw/plugin-sdk"; import type { PollInput } from "openclaw/plugin-sdk";
import { getMatrixRuntime } from "../runtime.js"; import { getMatrixRuntime } from "../runtime.js";
import { buildPollStartContent, M_POLL_START } from "./poll-types.js"; import { buildPollStartContent, M_POLL_START } from "./poll-types.js";
@ -45,6 +46,7 @@ export async function sendMessageMatrix(
const { client, stopOnDone } = await resolveMatrixClient({ const { client, stopOnDone } = await resolveMatrixClient({
client: opts.client, client: opts.client,
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
}); });
try { try {
const roomId = await resolveMatrixRoomId(client, to); const roomId = await resolveMatrixRoomId(client, to);
@ -122,9 +124,7 @@ export async function sendMessageMatrix(
const followupRelation = threadId ? relation : undefined; const followupRelation = threadId ? relation : undefined;
for (const chunk of textChunks) { for (const chunk of textChunks) {
const text = chunk.trim(); const text = chunk.trim();
if (!text) { if (!text) continue;
continue;
}
const followup = buildTextContent(text, followupRelation); const followup = buildTextContent(text, followupRelation);
const followupEventId = await sendContent(followup); const followupEventId = await sendContent(followup);
lastMessageId = followupEventId ?? lastMessageId; lastMessageId = followupEventId ?? lastMessageId;
@ -132,9 +132,7 @@ export async function sendMessageMatrix(
} else { } else {
for (const chunk of chunks.length ? chunks : [""]) { for (const chunk of chunks.length ? chunks : [""]) {
const text = chunk.trim(); const text = chunk.trim();
if (!text) { if (!text) continue;
continue;
}
const content = buildTextContent(text, relation); const content = buildTextContent(text, relation);
const eventId = await sendContent(content); const eventId = await sendContent(content);
lastMessageId = eventId ?? lastMessageId; lastMessageId = eventId ?? lastMessageId;
@ -214,9 +212,7 @@ export async function sendReadReceiptMatrix(
eventId: string, eventId: string,
client?: MatrixClient, client?: MatrixClient,
): Promise<void> { ): Promise<void> {
if (!eventId?.trim()) { if (!eventId?.trim()) return;
return;
}
const { client: resolved, stopOnDone } = await resolveMatrixClient({ const { client: resolved, stopOnDone } = await resolveMatrixClient({
client, client,
}); });
@ -234,13 +230,14 @@ export async function reactMatrixMessage(
roomId: string, roomId: string,
messageId: string, messageId: string,
emoji: string, emoji: string,
client?: MatrixClient, opts: { client?: MatrixClient; accountId?: string | null } = {},
): Promise<void> { ): Promise<void> {
if (!emoji.trim()) { if (!emoji.trim()) {
throw new Error("Matrix reaction requires an emoji"); throw new Error("Matrix reaction requires an emoji");
} }
const { client: resolved, stopOnDone } = await resolveMatrixClient({ const { client: resolved, stopOnDone } = await resolveMatrixClient({
client, client: opts.client,
accountId: opts.accountId,
}); });
try { try {
const resolvedRoom = await resolveMatrixRoomId(resolved, roomId); const resolvedRoom = await resolveMatrixRoomId(resolved, roomId);

View file

@ -1,5 +1,5 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
import type { CoreConfig } from "../types.js";
import { getMatrixRuntime } from "../../runtime.js"; import { getMatrixRuntime } from "../../runtime.js";
import { getActiveMatrixClient } from "../active-client.js"; import { getActiveMatrixClient } from "../active-client.js";
import { import {
@ -8,6 +8,7 @@ import {
resolveMatrixAuth, resolveMatrixAuth,
resolveSharedMatrixClient, resolveSharedMatrixClient,
} from "../client.js"; } from "../client.js";
import type { CoreConfig } from "../types.js";
const getCore = () => getMatrixRuntime(); const getCore = () => getMatrixRuntime();
@ -28,29 +29,31 @@ export function resolveMediaMaxBytes(): number | undefined {
export async function resolveMatrixClient(opts: { export async function resolveMatrixClient(opts: {
client?: MatrixClient; client?: MatrixClient;
timeoutMs?: number; timeoutMs?: number;
accountId?: string | null;
}): Promise<{ client: MatrixClient; stopOnDone: boolean }> { }): Promise<{ client: MatrixClient; stopOnDone: boolean }> {
ensureNodeRuntime(); ensureNodeRuntime();
if (opts.client) { if (opts.client) return { client: opts.client, stopOnDone: false };
return { client: opts.client, stopOnDone: false };
} // Try to get the active client for the specified account
const active = getActiveMatrixClient(); const active = getActiveMatrixClient(opts.accountId);
if (active) { if (active) return { client: active, stopOnDone: false };
return { client: active, stopOnDone: false };
}
const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT); const shouldShareClient = Boolean(process.env.OPENCLAW_GATEWAY_PORT);
if (shouldShareClient) { if (shouldShareClient) {
const client = await resolveSharedMatrixClient({ const client = await resolveSharedMatrixClient({
timeoutMs: opts.timeoutMs, timeoutMs: opts.timeoutMs,
accountId: opts.accountId,
}); });
return { client, stopOnDone: false }; return { client, stopOnDone: false };
} }
const auth = await resolveMatrixAuth(); const auth = await resolveMatrixAuth({ accountId: opts.accountId ?? undefined });
const client = await createMatrixClient({ const client = await createMatrixClient({
homeserver: auth.homeserver, homeserver: auth.homeserver,
userId: auth.userId, userId: auth.userId,
accessToken: auth.accessToken, accessToken: auth.accessToken,
encryption: auth.encryption, encryption: auth.encryption,
localTimeoutMs: opts.timeoutMs, localTimeoutMs: opts.timeoutMs,
accountId: opts.accountId ?? undefined,
}); });
if (auth.encryption && client.crypto) { if (auth.encryption && client.crypto) {
try { try {

View file

@ -1,11 +1,5 @@
import type { AgentToolResult } from "@mariozechner/pi-agent-core"; import type { AgentToolResult } from "@mariozechner/pi-agent-core";
import {
createActionGate,
jsonResult,
readNumberParam,
readReactionParams,
readStringParam,
} from "openclaw/plugin-sdk";
import type { CoreConfig } from "./types.js"; import type { CoreConfig } from "./types.js";
import { import {
deleteMatrixMessage, deleteMatrixMessage,
@ -21,6 +15,13 @@ import {
unpinMatrixMessage, unpinMatrixMessage,
} from "./matrix/actions.js"; } from "./matrix/actions.js";
import { reactMatrixMessage } from "./matrix/send.js"; import { reactMatrixMessage } from "./matrix/send.js";
import {
createActionGate,
jsonResult,
readNumberParam,
readReactionParams,
readStringParam,
} from "openclaw/plugin-sdk";
const messageActions = new Set(["sendMessage", "editMessage", "deleteMessage", "readMessages"]); const messageActions = new Set(["sendMessage", "editMessage", "deleteMessage", "readMessages"]);
const reactionActions = new Set(["react", "reactions"]); const reactionActions = new Set(["react", "reactions"]);
@ -28,12 +29,8 @@ const pinActions = new Set(["pinMessage", "unpinMessage", "listPins"]);
function readRoomId(params: Record<string, unknown>, required = true): string { function readRoomId(params: Record<string, unknown>, required = true): string {
const direct = readStringParam(params, "roomId") ?? readStringParam(params, "channelId"); const direct = readStringParam(params, "roomId") ?? readStringParam(params, "channelId");
if (direct) { if (direct) return direct;
return direct; if (!required) return readStringParam(params, "to") ?? "";
}
if (!required) {
return readStringParam(params, "to") ?? "";
}
return readStringParam(params, "to", { required: true }); return readStringParam(params, "to", { required: true });
} }
@ -42,6 +39,7 @@ export async function handleMatrixAction(
cfg: CoreConfig, cfg: CoreConfig,
): Promise<AgentToolResult<unknown>> { ): Promise<AgentToolResult<unknown>> {
const action = readStringParam(params, "action", { required: true }); const action = readStringParam(params, "action", { required: true });
const accountId = readStringParam(params, "accountId") ?? undefined;
const isActionEnabled = createActionGate(cfg.channels?.matrix?.actions); const isActionEnabled = createActionGate(cfg.channels?.matrix?.actions);
if (reactionActions.has(action)) { if (reactionActions.has(action)) {
@ -57,13 +55,14 @@ export async function handleMatrixAction(
if (remove || isEmpty) { if (remove || isEmpty) {
const result = await removeMatrixReactions(roomId, messageId, { const result = await removeMatrixReactions(roomId, messageId, {
emoji: remove ? emoji : undefined, emoji: remove ? emoji : undefined,
accountId,
}); });
return jsonResult({ ok: true, removed: result.removed }); return jsonResult({ ok: true, removed: result.removed });
} }
await reactMatrixMessage(roomId, messageId, emoji); await reactMatrixMessage(roomId, messageId, emoji, { accountId });
return jsonResult({ ok: true, added: emoji }); return jsonResult({ ok: true, added: emoji });
} }
const reactions = await listMatrixReactions(roomId, messageId); const reactions = await listMatrixReactions(roomId, messageId, { accountId });
return jsonResult({ ok: true, reactions }); return jsonResult({ ok: true, reactions });
} }
@ -79,13 +78,13 @@ export async function handleMatrixAction(
allowEmpty: true, allowEmpty: true,
}); });
const mediaUrl = readStringParam(params, "mediaUrl"); const mediaUrl = readStringParam(params, "mediaUrl");
const replyToId = const replyToId = readStringParam(params, "replyToId") ?? readStringParam(params, "replyTo");
readStringParam(params, "replyToId") ?? readStringParam(params, "replyTo");
const threadId = readStringParam(params, "threadId"); const threadId = readStringParam(params, "threadId");
const result = await sendMatrixMessage(to, content, { const result = await sendMatrixMessage(to, content, {
mediaUrl: mediaUrl ?? undefined, mediaUrl: mediaUrl ?? undefined,
replyToId: replyToId ?? undefined, replyToId: replyToId ?? undefined,
threadId: threadId ?? undefined, threadId: threadId ?? undefined,
accountId,
}); });
return jsonResult({ ok: true, result }); return jsonResult({ ok: true, result });
} }
@ -93,14 +92,14 @@ export async function handleMatrixAction(
const roomId = readRoomId(params); const roomId = readRoomId(params);
const messageId = readStringParam(params, "messageId", { required: true }); const messageId = readStringParam(params, "messageId", { required: true });
const content = readStringParam(params, "content", { required: true }); const content = readStringParam(params, "content", { required: true });
const result = await editMatrixMessage(roomId, messageId, content); const result = await editMatrixMessage(roomId, messageId, content, { accountId });
return jsonResult({ ok: true, result }); return jsonResult({ ok: true, result });
} }
case "deleteMessage": { case "deleteMessage": {
const roomId = readRoomId(params); const roomId = readRoomId(params);
const messageId = readStringParam(params, "messageId", { required: true }); const messageId = readStringParam(params, "messageId", { required: true });
const reason = readStringParam(params, "reason"); const reason = readStringParam(params, "reason");
await deleteMatrixMessage(roomId, messageId, { reason: reason ?? undefined }); await deleteMatrixMessage(roomId, messageId, { reason: reason ?? undefined, accountId });
return jsonResult({ ok: true, deleted: true }); return jsonResult({ ok: true, deleted: true });
} }
case "readMessages": { case "readMessages": {
@ -112,6 +111,7 @@ export async function handleMatrixAction(
limit: limit ?? undefined, limit: limit ?? undefined,
before: before ?? undefined, before: before ?? undefined,
after: after ?? undefined, after: after ?? undefined,
accountId,
}); });
return jsonResult({ ok: true, ...result }); return jsonResult({ ok: true, ...result });
} }
@ -127,15 +127,15 @@ export async function handleMatrixAction(
const roomId = readRoomId(params); const roomId = readRoomId(params);
if (action === "pinMessage") { if (action === "pinMessage") {
const messageId = readStringParam(params, "messageId", { required: true }); const messageId = readStringParam(params, "messageId", { required: true });
const result = await pinMatrixMessage(roomId, messageId); const result = await pinMatrixMessage(roomId, messageId, { accountId });
return jsonResult({ ok: true, pinned: result.pinned }); return jsonResult({ ok: true, pinned: result.pinned });
} }
if (action === "unpinMessage") { if (action === "unpinMessage") {
const messageId = readStringParam(params, "messageId", { required: true }); const messageId = readStringParam(params, "messageId", { required: true });
const result = await unpinMatrixMessage(roomId, messageId); const result = await unpinMatrixMessage(roomId, messageId, { accountId });
return jsonResult({ ok: true, pinned: result.pinned }); return jsonResult({ ok: true, pinned: result.pinned });
} }
const result = await listMatrixPins(roomId); const result = await listMatrixPins(roomId, { accountId });
return jsonResult({ ok: true, pinned: result.pinned, events: result.events }); return jsonResult({ ok: true, pinned: result.pinned, events: result.events });
} }
@ -147,6 +147,7 @@ export async function handleMatrixAction(
const roomId = readStringParam(params, "roomId") ?? readStringParam(params, "channelId"); const roomId = readStringParam(params, "roomId") ?? readStringParam(params, "channelId");
const result = await getMatrixMemberInfo(userId, { const result = await getMatrixMemberInfo(userId, {
roomId: roomId ?? undefined, roomId: roomId ?? undefined,
accountId,
}); });
return jsonResult({ ok: true, member: result }); return jsonResult({ ok: true, member: result });
} }
@ -156,7 +157,7 @@ export async function handleMatrixAction(
throw new Error("Matrix room info is disabled."); throw new Error("Matrix room info is disabled.");
} }
const roomId = readRoomId(params); const roomId = readRoomId(params);
const result = await getMatrixRoomInfo(roomId); const result = await getMatrixRoomInfo(roomId, { accountId });
return jsonResult({ ok: true, room: result }); return jsonResult({ ok: true, room: result });
} }

View file

@ -38,10 +38,10 @@ export type MatrixActionConfig = {
channelInfo?: boolean; channelInfo?: boolean;
}; };
export type MatrixConfig = { export type MatrixAccountConfig = {
/** Optional display name for this account (used in CLI/UI lists). */ /** Optional display name for this account (used in CLI/UI lists). */
name?: string; name?: string;
/** If false, do not start Matrix. Default: true. */ /** If false, do not start this account. Default: true. */
enabled?: boolean; enabled?: boolean;
/** Matrix homeserver URL (https://matrix.example.org). */ /** Matrix homeserver URL (https://matrix.example.org). */
homeserver?: string; homeserver?: string;
@ -87,9 +87,22 @@ export type MatrixConfig = {
actions?: MatrixActionConfig; actions?: MatrixActionConfig;
}; };
export type MatrixConfig = {
/** Optional per-account Matrix configuration (multi-account). */
accounts?: Record<string, MatrixAccountConfig>;
} & MatrixAccountConfig;
export type CoreConfig = { export type CoreConfig = {
channels?: { channels?: {
matrix?: MatrixConfig; matrix?: MatrixConfig;
}; };
bindings?: Array<{
agentId?: string;
match?: {
channel?: string;
accountId?: string;
peer?: { kind?: string; id?: string };
};
}>;
[key: string]: unknown; [key: string]: unknown;
}; };