openclaw-matrix-multiaccounts/src/config/sessions.ts
Peter Steinberger 7acd26a2fc
Move provider to a plugin-architecture (#661)
* refactor: introduce provider plugin registry

* refactor: move provider CLI to plugins

* docs: add provider plugin implementation notes

* refactor: shift provider runtime logic into plugins

* refactor: add plugin defaults and summaries

* docs: update provider plugin notes

* feat(commands): add /commands slash list

* Auto-reply: tidy help message

* Auto-reply: fix status command lint

* Tests: align google shared expectations

* Auto-reply: tidy help message

* Auto-reply: fix status command lint

* refactor: move provider routing into plugins

* test: align agent routing expectations

* docs: update provider plugin notes

* refactor: route replies via provider plugins

* docs: note route-reply plugin hooks

* refactor: extend provider plugin contract

* refactor: derive provider status from plugins

* refactor: unify gateway provider control

* refactor: use plugin metadata in auto-reply

* fix: parenthesize cron target selection

* refactor: derive gateway methods from plugins

* refactor: generalize provider logout

* refactor: route provider logout through plugins

* refactor: move WhatsApp web login methods into plugin

* refactor: generalize provider log prefixes

* refactor: centralize default chat provider

* refactor: derive provider lists from registry

* refactor: move provider reload noops into plugins

* refactor: resolve web login provider via alias

* refactor: derive CLI provider options from plugins

* refactor: derive prompt provider list from plugins

* style: apply biome lint fixes

* fix: resolve provider routing edge cases

* docs: update provider plugin refactor notes

* fix(gateway): harden agent provider routing

* refactor: move provider routing into plugins

* refactor: move provider CLI to plugins

* refactor: derive provider lists from registry

* fix: restore slash command parsing

* refactor: align provider ids for schema

* refactor: unify outbound target resolution

* fix: keep outbound labels stable

* feat: add msteams to cron surfaces

* fix: clean up lint build issues

* refactor: localize chat provider alias normalization

* refactor: drive gateway provider lists from plugins

* docs: update provider plugin notes

* style: format message-provider

* fix: avoid provider registry init cycles

* style: sort message-provider imports

* fix: relax provider alias map typing

* refactor: move provider routing into plugins

* refactor: add plugin pairing/config adapters

* refactor: route pairing and provider removal via plugins

* refactor: align auto-reply provider typing

* test: stabilize telegram media mocks

* docs: update provider plugin refactor notes

* refactor: pluginize outbound targets

* refactor: pluginize provider selection

* refactor: generalize text chunk limits

* docs: update provider plugin notes

* refactor: generalize group session/config

* fix: normalize provider id for room detection

* fix: avoid provider init in system prompt

* style: formatting cleanup

* refactor: normalize agent delivery targets

* test: update outbound delivery labels

* chore: fix lint regressions

* refactor: extend provider plugin adapters

* refactor: move elevated/block streaming defaults to plugins

* refactor: defer outbound send deps to plugins

* docs: note plugin-driven streaming/elevated defaults

* refactor: centralize webchat provider constant

* refactor: add provider setup adapters

* refactor: delegate provider add config to plugins

* docs: document plugin-driven provider add

* refactor: add plugin state/binding metadata

* refactor: build agent provider status from plugins

* docs: note plugin-driven agent bindings

* refactor: centralize internal provider constant usage

* fix: normalize WhatsApp targets for groups and E.164 (#631) (thanks @imfing)

* refactor: centralize default chat provider

* refactor: centralize WhatsApp target normalization

* refactor: move provider routing into plugins

* refactor: normalize agent delivery targets

* chore: fix lint regressions

* fix: normalize WhatsApp targets for groups and E.164 (#631) (thanks @imfing)

* feat: expand provider plugin adapters

* refactor: route auto-reply via provider plugins

* fix: align WhatsApp target normalization

* fix: normalize WhatsApp targets for groups and E.164 (#631) (thanks @imfing)

* refactor: centralize WhatsApp target normalization

* feat: add /config chat config updates

* docs: add /config get alias

* feat(commands): add /commands slash list

* refactor: centralize default chat provider

* style: apply biome lint fixes

* chore: fix lint regressions

* fix: clean up whatsapp allowlist typing

* style: format config command helpers

* refactor: pluginize tool threading context

* refactor: normalize session announce targets

* docs: note new plugin threading and announce hooks

* refactor: pluginize message actions

* docs: update provider plugin actions notes

* fix: align provider action adapters

* refactor: centralize webchat checks

* style: format message provider helpers

* refactor: move provider onboarding into adapters

* docs: note onboarding provider adapters

* feat: add msteams onboarding adapter

* style: organize onboarding imports

* fix: normalize msteams allowFrom types

* feat: add plugin text chunk limits

* refactor: use plugin chunk limit fallbacks

* feat: add provider mention stripping hooks

* style: organize provider plugin type imports

* refactor: generalize health snapshots

* refactor: update macOS health snapshot handling

* docs: refresh health snapshot notes

* style: format health snapshot updates

* refactor: drive security warnings via plugins

* docs: note provider security adapter

* style: format provider security adapters

* refactor: centralize provider account defaults

* refactor: type gateway client identity constants

* chore: regen gateway protocol swift

* fix: degrade health on failed provider probe

* refactor: centralize pairing approve hint

* docs: add plugin CLI command references

* refactor: route auth and tool sends through plugins

* docs: expand provider plugin hooks

* refactor: document provider docking touchpoints

* refactor: normalize internal provider defaults

* refactor: streamline outbound delivery wiring

* refactor: make provider onboarding plugin-owned

* refactor: support provider-owned agent tools

* refactor: move telegram draft chunking into telegram module

* refactor: infer provider tool sends via extractToolSend

* fix: repair plugin onboarding imports

* refactor: de-dup outbound target normalization

* style: tidy plugin and agent imports

* refactor: data-drive provider selection line

* fix: satisfy lint after provider plugin rebase

* test: deflake gateway-cli coverage

* style: format gateway-cli coverage test

* refactor(provider-plugins): simplify provider ids

* test(pairing-cli): avoid provider-specific ternary

* style(macos): swiftformat HealthStore

* refactor(sandbox): derive provider tool denylist

* fix(sandbox): avoid plugin init in defaults

* refactor(provider-plugins): centralize provider aliases

* style(test): satisfy biome

* refactor(protocol): v3 providers.status maps

* refactor(ui): adapt to protocol v3

* refactor(macos): adapt to protocol v3

* test: update providers.status v3 fixtures

* refactor(gateway): map provider runtime snapshot

* test(gateway): update reload runtime snapshot

* refactor(whatsapp): normalize heartbeat provider id

* docs(refactor): update provider plugin notes

* style: satisfy biome after rebase

* fix: describe sandboxed elevated in prompt

* feat(gateway): add agent image attachments + live probe

* refactor: derive CLI provider options from plugins

* fix(gateway): harden agent provider routing

* fix(gateway): harden agent provider routing

* refactor: align provider ids for schema

* fix(protocol): keep agent provider string

* fix(gateway): harden agent provider routing

* fix(protocol): keep agent provider string

* refactor: normalize agent delivery targets

* refactor: support provider-owned agent tools

* refactor(config): provider-keyed elevated allowFrom

* style: satisfy biome

* fix(gateway): appease provider narrowing

* style: satisfy biome

* refactor(reply): move group intro hints into plugin

* fix(reply): avoid plugin registry init cycle

* refactor(providers): add lightweight provider dock

* refactor(gateway): use typed client id in connect

* refactor(providers): document docks and avoid init cycles

* refactor(providers): make media limit helper generic

* fix(providers): break plugin registry import cycles

* style: satisfy biome

* refactor(status-all): build providers table from plugins

* refactor(gateway): delegate web login to provider plugin

* refactor(provider): drop web alias

* refactor(provider): lazy-load monitors

* style: satisfy lint/format

* style: format status-all providers table

* style: swiftformat gateway discovery model

* test: make reload plan plugin-driven

* fix: avoid token stringification in status-all

* refactor: make provider IDs explicit in status

* feat: warn on signal/imessage provider runtime errors

* test: cover gateway provider runtime warnings in status

* fix: add runtime kind to provider status issues

* test: cover health degradation on probe failure

* fix: keep routeReply lightweight

* style: organize routeReply imports

* refactor(web): extract auth-store helpers

* refactor(whatsapp): lazy login imports

* refactor(outbound): route replies via plugin outbound

* docs: update provider plugin notes

* style: format provider status issues

* fix: make sandbox scope warning wrap-safe

* refactor: load outbound adapters from provider plugins

* docs: update provider plugin outbound notes

* style(macos): fix swiftformat lint

* docs: changelog for provider plugins

* fix(macos): satisfy swiftformat

* fix(macos): open settings via menu action

* style: format after rebase

* fix(macos): open Settings via menu action

---------

Co-authored-by: LK <luke@kyohere.com>
Co-authored-by: Luke K (pr-0f3t) <2609441+lc0rp@users.noreply.github.com>
Co-authored-by: Xin <xin@imfing.com>
2026-01-11 11:45:25 +00:00

663 lines
19 KiB
TypeScript

import crypto from "node:crypto";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { Skill } from "@mariozechner/pi-coding-agent";
import JSON5 from "json5";
import type { MsgContext } from "../auto-reply/templating.js";
import type { ProviderId } from "../providers/plugins/types.js";
import { PROVIDER_IDS } from "../providers/registry.js";
import {
buildAgentMainSessionKey,
DEFAULT_AGENT_ID,
normalizeAgentId,
normalizeMainKey,
resolveAgentIdFromSessionKey,
} from "../routing/session-key.js";
import { normalizeE164 } from "../utils.js";
import {
getFileMtimeMs,
isCacheEnabled,
resolveCacheTtlMs,
} from "./cache-utils.js";
import { loadConfig } from "./config.js";
import { resolveStateDir } from "./paths.js";
// ============================================================================
// Session Store Cache with TTL Support
// ============================================================================
type SessionStoreCacheEntry = {
store: Record<string, SessionEntry>;
loadedAt: number;
storePath: string;
mtimeMs?: number;
};
const SESSION_STORE_CACHE = new Map<string, SessionStoreCacheEntry>();
const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s)
function getSessionStoreTtl(): number {
return resolveCacheTtlMs({
envValue: process.env.CLAWDBOT_SESSION_CACHE_TTL_MS,
defaultTtlMs: DEFAULT_SESSION_STORE_TTL_MS,
});
}
function isSessionStoreCacheEnabled(): boolean {
return isCacheEnabled(getSessionStoreTtl());
}
function isSessionStoreCacheValid(entry: SessionStoreCacheEntry): boolean {
const now = Date.now();
const ttl = getSessionStoreTtl();
return now - entry.loadedAt <= ttl;
}
function invalidateSessionStoreCache(storePath: string): void {
SESSION_STORE_CACHE.delete(storePath);
}
export function clearSessionStoreCacheForTest(): void {
SESSION_STORE_CACHE.clear();
}
export type SessionScope = "per-sender" | "global";
export type SessionProviderId = ProviderId | "webchat";
const GROUP_SURFACES = new Set<string>([...PROVIDER_IDS, "webchat"]);
export type SessionChatType = "direct" | "group" | "room";
export type SessionEntry = {
sessionId: string;
updatedAt: number;
sessionFile?: string;
/** Parent session key that spawned this session (used for sandbox session-tool scoping). */
spawnedBy?: string;
systemSent?: boolean;
abortedLastRun?: boolean;
chatType?: SessionChatType;
thinkingLevel?: string;
verboseLevel?: string;
reasoningLevel?: string;
elevatedLevel?: string;
responseUsage?: "on" | "off";
providerOverride?: string;
modelOverride?: string;
authProfileOverride?: string;
groupActivation?: "mention" | "always";
groupActivationNeedsSystemIntro?: boolean;
sendPolicy?: "allow" | "deny";
queueMode?:
| "steer"
| "followup"
| "collect"
| "steer-backlog"
| "steer+backlog"
| "queue"
| "interrupt";
queueDebounceMs?: number;
queueCap?: number;
queueDrop?: "old" | "new" | "summarize";
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
modelProvider?: string;
model?: string;
contextTokens?: number;
compactionCount?: number;
cliSessionIds?: Record<string, string>;
claudeCliSessionId?: string;
label?: string;
displayName?: string;
provider?: string;
subject?: string;
room?: string;
space?: string;
lastProvider?: SessionProviderId;
lastTo?: string;
lastAccountId?: string;
skillsSnapshot?: SessionSkillSnapshot;
};
export function mergeSessionEntry(
existing: SessionEntry | undefined,
patch: Partial<SessionEntry>,
): SessionEntry {
const sessionId =
patch.sessionId ?? existing?.sessionId ?? crypto.randomUUID();
const updatedAt = Math.max(
existing?.updatedAt ?? 0,
patch.updatedAt ?? 0,
Date.now(),
);
if (!existing) return { ...patch, sessionId, updatedAt };
return { ...existing, ...patch, sessionId, updatedAt };
}
export type GroupKeyResolution = {
key: string;
legacyKey?: string;
provider?: string;
id?: string;
chatType?: SessionChatType;
};
export type SessionSkillSnapshot = {
prompt: string;
skills: Array<{ name: string; primaryEnv?: string }>;
resolvedSkills?: Skill[];
};
function resolveAgentSessionsDir(
agentId?: string,
env: NodeJS.ProcessEnv = process.env,
homedir: () => string = os.homedir,
): string {
const root = resolveStateDir(env, homedir);
const id = normalizeAgentId(agentId ?? DEFAULT_AGENT_ID);
return path.join(root, "agents", id, "sessions");
}
export function resolveSessionTranscriptsDir(
env: NodeJS.ProcessEnv = process.env,
homedir: () => string = os.homedir,
): string {
return resolveAgentSessionsDir(DEFAULT_AGENT_ID, env, homedir);
}
export function resolveSessionTranscriptsDirForAgent(
agentId?: string,
env: NodeJS.ProcessEnv = process.env,
homedir: () => string = os.homedir,
): string {
return resolveAgentSessionsDir(agentId, env, homedir);
}
export function resolveDefaultSessionStorePath(agentId?: string): string {
return path.join(resolveAgentSessionsDir(agentId), "sessions.json");
}
export const DEFAULT_RESET_TRIGGER = "/new";
export const DEFAULT_RESET_TRIGGERS = ["/new", "/reset"];
export const DEFAULT_IDLE_MINUTES = 60;
export function resolveSessionTranscriptPath(
sessionId: string,
agentId?: string,
topicId?: number,
): string {
const fileName =
topicId !== undefined
? `${sessionId}-topic-${topicId}.jsonl`
: `${sessionId}.jsonl`;
return path.join(resolveAgentSessionsDir(agentId), fileName);
}
export function resolveSessionFilePath(
sessionId: string,
entry?: SessionEntry,
opts?: { agentId?: string },
): string {
const candidate = entry?.sessionFile?.trim();
return candidate
? candidate
: resolveSessionTranscriptPath(sessionId, opts?.agentId);
}
export function resolveStorePath(store?: string, opts?: { agentId?: string }) {
const agentId = normalizeAgentId(opts?.agentId ?? DEFAULT_AGENT_ID);
if (!store) return resolveDefaultSessionStorePath(agentId);
if (store.includes("{agentId}")) {
const expanded = store.replaceAll("{agentId}", agentId);
if (expanded.startsWith("~")) {
return path.resolve(expanded.replace(/^~(?=$|[\\/])/, os.homedir()));
}
return path.resolve(expanded);
}
if (store.startsWith("~"))
return path.resolve(store.replace(/^~(?=$|[\\/])/, os.homedir()));
return path.resolve(store);
}
export function resolveMainSessionKey(cfg?: {
session?: { scope?: SessionScope; mainKey?: string };
agents?: { list?: Array<{ id?: string; default?: boolean }> };
}): string {
if (cfg?.session?.scope === "global") return "global";
const agents = cfg?.agents?.list ?? [];
const defaultAgentId =
agents.find((agent) => agent?.default)?.id ??
agents[0]?.id ??
DEFAULT_AGENT_ID;
const agentId = normalizeAgentId(defaultAgentId);
const mainKey = normalizeMainKey(cfg?.session?.mainKey);
return buildAgentMainSessionKey({ agentId, mainKey });
}
export function resolveMainSessionKeyFromConfig(): string {
return resolveMainSessionKey(loadConfig());
}
export { resolveAgentIdFromSessionKey };
export function resolveAgentMainSessionKey(params: {
cfg?: { session?: { mainKey?: string } };
agentId: string;
}): string {
const mainKey = normalizeMainKey(params.cfg?.session?.mainKey);
return buildAgentMainSessionKey({ agentId: params.agentId, mainKey });
}
function normalizeGroupLabel(raw?: string) {
const trimmed = raw?.trim().toLowerCase() ?? "";
if (!trimmed) return "";
const dashed = trimmed.replace(/\s+/g, "-");
const cleaned = dashed.replace(/[^a-z0-9#@._+-]+/g, "-");
return cleaned.replace(/-{2,}/g, "-").replace(/^[-.]+|[-.]+$/g, "");
}
function shortenGroupId(value?: string) {
const trimmed = value?.trim() ?? "";
if (!trimmed) return "";
if (trimmed.length <= 14) return trimmed;
return `${trimmed.slice(0, 6)}...${trimmed.slice(-4)}`;
}
export function buildGroupDisplayName(params: {
provider?: string;
subject?: string;
room?: string;
space?: string;
id?: string;
key: string;
}) {
const providerKey = (params.provider?.trim().toLowerCase() || "group").trim();
const room = params.room?.trim();
const space = params.space?.trim();
const subject = params.subject?.trim();
const detail =
(room && space
? `${space}${room.startsWith("#") ? "" : "#"}${room}`
: room || subject || space || "") || "";
const fallbackId = params.id?.trim() || params.key.replace(/^group:/, "");
const rawLabel = detail || fallbackId;
let token = normalizeGroupLabel(rawLabel);
if (!token) {
token = normalizeGroupLabel(shortenGroupId(rawLabel));
}
if (!params.room && token.startsWith("#")) {
token = token.replace(/^#+/, "");
}
if (
token &&
!/^[@#]/.test(token) &&
!token.startsWith("g-") &&
!token.includes("#")
) {
token = `g-${token}`;
}
return token ? `${providerKey}:${token}` : providerKey;
}
export function resolveGroupSessionKey(
ctx: MsgContext,
): GroupKeyResolution | null {
const from = typeof ctx.From === "string" ? ctx.From.trim() : "";
if (!from) return null;
const chatType = ctx.ChatType?.trim().toLowerCase();
const isGroup =
chatType === "group" ||
from.startsWith("group:") ||
from.includes("@g.us") ||
from.includes(":group:") ||
from.includes(":channel:");
if (!isGroup) return null;
const providerHint = ctx.Provider?.trim().toLowerCase();
const hasLegacyGroupPrefix = from.startsWith("group:");
const raw = (
hasLegacyGroupPrefix ? from.slice("group:".length) : from
).trim();
let provider: string | undefined;
let kind: "group" | "channel" | undefined;
let id = "";
const parseKind = (value: string) => {
if (value === "channel") return "channel";
return "group";
};
const parseParts = (parts: string[]) => {
if (parts.length >= 2 && GROUP_SURFACES.has(parts[0])) {
provider = parts[0];
if (parts.length >= 3) {
const kindCandidate = parts[1];
if (["group", "channel"].includes(kindCandidate)) {
kind = parseKind(kindCandidate);
id = parts.slice(2).join(":");
} else {
id = parts.slice(1).join(":");
}
} else {
id = parts[1];
}
return;
}
if (parts.length >= 2 && ["group", "channel"].includes(parts[0])) {
kind = parseKind(parts[0]);
id = parts.slice(1).join(":");
}
};
if (hasLegacyGroupPrefix) {
const legacyParts = raw.split(":").filter(Boolean);
if (legacyParts.length > 1) {
parseParts(legacyParts);
} else {
id = raw;
}
} else if (from.includes("@g.us") && !from.includes(":")) {
id = from;
} else {
parseParts(from.split(":").filter(Boolean));
if (!id) {
id = raw || from;
}
}
const resolvedProvider = provider ?? providerHint;
if (!resolvedProvider) {
const legacy = hasLegacyGroupPrefix ? `group:${raw}` : `group:${from}`;
return {
key: legacy,
id: raw || from,
legacyKey: legacy,
chatType: "group",
};
}
const resolvedKind = kind === "channel" ? "channel" : "group";
const key = `${resolvedProvider}:${resolvedKind}:${id || raw || from}`;
let legacyKey: string | undefined;
if (hasLegacyGroupPrefix || from.includes("@g.us")) {
legacyKey = `group:${id || raw || from}`;
}
return {
key,
legacyKey,
provider: resolvedProvider,
id: id || raw || from,
chatType: resolvedKind === "channel" ? "room" : "group",
};
}
export function loadSessionStore(
storePath: string,
): Record<string, SessionEntry> {
// Check cache first if enabled
if (isSessionStoreCacheEnabled()) {
const cached = SESSION_STORE_CACHE.get(storePath);
if (cached && isSessionStoreCacheValid(cached)) {
const currentMtimeMs = getFileMtimeMs(storePath);
if (currentMtimeMs === cached.mtimeMs) {
// Return a shallow copy to prevent external mutations affecting cache
return { ...cached.store };
}
invalidateSessionStoreCache(storePath);
}
}
// Cache miss or disabled - load from disk
let store: Record<string, SessionEntry> = {};
let mtimeMs = getFileMtimeMs(storePath);
try {
const raw = fs.readFileSync(storePath, "utf-8");
const parsed = JSON5.parse(raw);
if (parsed && typeof parsed === "object") {
store = parsed as Record<string, SessionEntry>;
}
mtimeMs = getFileMtimeMs(storePath) ?? mtimeMs;
} catch {
// ignore missing/invalid store; we'll recreate it
}
// Cache the result if caching is enabled
if (isSessionStoreCacheEnabled()) {
SESSION_STORE_CACHE.set(storePath, {
store: { ...store }, // Store a copy to prevent external mutations
loadedAt: Date.now(),
storePath,
mtimeMs,
});
}
return store;
}
async function saveSessionStoreUnlocked(
storePath: string,
store: Record<string, SessionEntry>,
): Promise<void> {
// Invalidate cache on write to ensure consistency
invalidateSessionStoreCache(storePath);
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
const json = JSON.stringify(store, null, 2);
// Windows: avoid atomic rename swaps (can be flaky under concurrent access).
// We serialize writers via the session-store lock instead.
if (process.platform === "win32") {
try {
await fs.promises.writeFile(storePath, json, "utf-8");
} catch (err) {
const code =
err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
if (code === "ENOENT") return;
throw err;
}
return;
}
const tmp = `${storePath}.${process.pid}.${crypto.randomUUID()}.tmp`;
try {
await fs.promises.writeFile(tmp, json, "utf-8");
await fs.promises.rename(tmp, storePath);
} catch (err) {
const code =
err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
if (code === "ENOENT") {
// In tests the temp session-store directory may be deleted while writes are in-flight.
// Best-effort: try a direct write (recreating the parent dir), otherwise ignore.
try {
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
await fs.promises.writeFile(storePath, json, "utf-8");
} catch (err2) {
const code2 =
err2 && typeof err2 === "object" && "code" in err2
? String((err2 as { code?: unknown }).code)
: null;
if (code2 === "ENOENT") return;
throw err2;
}
return;
}
throw err;
} finally {
await fs.promises.rm(tmp, { force: true });
}
}
export async function saveSessionStore(
storePath: string,
store: Record<string, SessionEntry>,
): Promise<void> {
await withSessionStoreLock(storePath, async () => {
await saveSessionStoreUnlocked(storePath, store);
});
}
type SessionStoreLockOptions = {
timeoutMs?: number;
pollIntervalMs?: number;
staleMs?: number;
};
async function withSessionStoreLock<T>(
storePath: string,
fn: () => Promise<T>,
opts: SessionStoreLockOptions = {},
): Promise<T> {
const timeoutMs = opts.timeoutMs ?? 10_000;
const pollIntervalMs = opts.pollIntervalMs ?? 25;
const staleMs = opts.staleMs ?? 30_000;
const lockPath = `${storePath}.lock`;
const startedAt = Date.now();
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
while (true) {
try {
const handle = await fs.promises.open(lockPath, "wx");
try {
await handle.writeFile(
JSON.stringify({ pid: process.pid, startedAt: Date.now() }),
"utf-8",
);
} catch {
// best-effort
}
await handle.close();
break;
} catch (err) {
const code =
err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
if (code === "ENOENT") {
// Store directory may be deleted/recreated in tests while writes are in-flight.
// Best-effort: recreate the parent dir and retry until timeout.
await fs.promises
.mkdir(path.dirname(storePath), { recursive: true })
.catch(() => undefined);
await new Promise((r) => setTimeout(r, pollIntervalMs));
continue;
}
if (code !== "EEXIST") throw err;
const now = Date.now();
if (now - startedAt > timeoutMs) {
throw new Error(`timeout acquiring session store lock: ${lockPath}`);
}
// Best-effort stale lock eviction (e.g. crashed process).
try {
const st = await fs.promises.stat(lockPath);
const ageMs = now - st.mtimeMs;
if (ageMs > staleMs) {
await fs.promises.unlink(lockPath);
continue;
}
} catch {
// ignore
}
await new Promise((r) => setTimeout(r, pollIntervalMs));
}
}
try {
return await fn();
} finally {
await fs.promises.unlink(lockPath).catch(() => undefined);
}
}
export async function updateSessionStoreEntry(params: {
storePath: string;
sessionKey: string;
update: (entry: SessionEntry) => Promise<Partial<SessionEntry> | null>;
}): Promise<SessionEntry | null> {
const { storePath, sessionKey, update } = params;
return await withSessionStoreLock(storePath, async () => {
const store = loadSessionStore(storePath);
const existing = store[sessionKey];
if (!existing) return null;
const patch = await update(existing);
if (!patch) return existing;
const next = mergeSessionEntry(existing, patch);
store[sessionKey] = next;
await saveSessionStoreUnlocked(storePath, store);
return next;
});
}
export async function updateLastRoute(params: {
storePath: string;
sessionKey: string;
provider: SessionEntry["lastProvider"];
to?: string;
accountId?: string;
}) {
const { storePath, sessionKey, provider, to, accountId } = params;
return await withSessionStoreLock(storePath, async () => {
const store = loadSessionStore(storePath);
const existing = store[sessionKey];
const now = Date.now();
const next = mergeSessionEntry(existing, {
updatedAt: Math.max(existing?.updatedAt ?? 0, now),
lastProvider: provider,
lastTo: to?.trim() ? to.trim() : undefined,
lastAccountId: accountId?.trim()
? accountId.trim()
: existing?.lastAccountId,
});
store[sessionKey] = next;
await saveSessionStoreUnlocked(storePath, store);
return next;
});
}
// Decide which session bucket to use (per-sender vs global).
export function deriveSessionKey(scope: SessionScope, ctx: MsgContext) {
if (scope === "global") return "global";
const resolvedGroup = resolveGroupSessionKey(ctx);
if (resolvedGroup) return resolvedGroup.key;
const from = ctx.From ? normalizeE164(ctx.From) : "";
return from || "unknown";
}
/**
* Resolve the session key with a canonical direct-chat bucket (default: "main").
* All non-group direct chats collapse to this bucket; groups stay isolated.
*/
export function resolveSessionKey(
scope: SessionScope,
ctx: MsgContext,
mainKey?: string,
) {
const explicit = ctx.SessionKey?.trim();
if (explicit) return explicit;
const raw = deriveSessionKey(scope, ctx);
if (scope === "global") return raw;
const canonicalMainKey = normalizeMainKey(mainKey);
const canonical = buildAgentMainSessionKey({
agentId: DEFAULT_AGENT_ID,
mainKey: canonicalMainKey,
});
const isGroup =
raw.startsWith("group:") ||
raw.includes(":group:") ||
raw.includes(":channel:");
if (!isGroup) return canonical;
return `agent:${DEFAULT_AGENT_ID}:${raw}`;
}