From 77dbafc77d1e8c479e448f9dc51e022d86d73e6c Mon Sep 17 00:00:00 2001 From: Claudia Date: Mon, 2 Feb 2026 09:30:36 +0100 Subject: [PATCH] feat(core): Add Event Store integration with NATS JetStream Phase 1 of RFC-001: Event-Sourced Memory - Add event-store.ts module for NATS JetStream integration - All agent events (messages, tool calls, lifecycle) are published - Configure via gateway.eventStore in config - Events are persistent and queryable - Non-blocking: failures don't affect core functionality Config example: gateway: eventStore: enabled: true natsUrl: nats://localhost:4222 streamName: openclaw-events subjectPrefix: openclaw.events Co-authored-by: Albert Hild --- package.json | 1 + src/config/zod-schema.ts | 9 ++ src/gateway/server-close.ts | 4 + src/gateway/server.impl.ts | 14 +++ src/infra/event-store.ts | 234 ++++++++++++++++++++++++++++++++++++ 5 files changed, 262 insertions(+) create mode 100644 src/infra/event-store.ts diff --git a/package.json b/package.json index 951f3d79a..bb84a5057 100644 --- a/package.json +++ b/package.json @@ -152,6 +152,7 @@ }, "dependencies": { "@agentclientprotocol/sdk": "0.13.1", + "nats": "^2.19.0", "@aws-sdk/client-bedrock": "^3.980.0", "@buape/carbon": "0.14.0", "@clack/prompts": "^1.0.0", diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index c9691e0e1..0810e85e7 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -443,6 +443,15 @@ export const OpenClawSchema = z }) .strict() .optional(), + eventStore: z + .object({ + enabled: z.boolean().optional(), + natsUrl: z.string().optional(), + streamName: z.string().optional(), + subjectPrefix: z.string().optional(), + }) + .strict() + .optional(), }) .strict() .optional(), diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index ea0323587..f31b71cd2 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -5,6 +5,7 @@ import type { HeartbeatRunner } from "../infra/heartbeat-runner.js"; import type { PluginServicesHandle } from "../plugins/services.js"; import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; import { stopGmailWatcher } from "../hooks/gmail-watcher.js"; +import { shutdownEventStore } from "../infra/event-store.js"; export function createGatewayCloseHandler(params: { bonjourStop: (() => Promise) | null; @@ -124,5 +125,8 @@ export function createGatewayCloseHandler(params: { httpServer.close((err) => (err ? reject(err) : resolve())), ); } + + // Shutdown Event Store connection + await shutdownEventStore().catch(() => {}); }; } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 9e5142f13..89e6df33c 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -18,6 +18,7 @@ import { } from "../config/config.js"; import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js"; import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js"; +import { initEventStore, shutdownEventStore } from "../infra/event-store.js"; import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js"; import { logAcceptedEnvOption } from "../infra/env.js"; import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js"; @@ -216,6 +217,19 @@ export async function startGatewayServer( } setGatewaySigusr1RestartPolicy({ allowExternal: cfgAtStart.commands?.restart === true }); initSubagentRegistry(); + + // Initialize Event Store if configured + const eventStoreConfig = cfgAtStart.gateway?.eventStore; + if (eventStoreConfig?.enabled) { + await initEventStore({ + enabled: true, + natsUrl: eventStoreConfig.natsUrl || "nats://localhost:4222", + streamName: eventStoreConfig.streamName || "openclaw-events", + subjectPrefix: eventStoreConfig.subjectPrefix || "openclaw.events", + }); + log.info("gateway: Event Store initialized"); + } + const defaultAgentId = resolveDefaultAgentId(cfgAtStart); const defaultWorkspaceDir = resolveAgentWorkspaceDir(cfgAtStart, defaultAgentId); const baseMethods = listGatewayMethods(); diff --git a/src/infra/event-store.ts b/src/infra/event-store.ts new file mode 100644 index 000000000..4c9eeb3fe --- /dev/null +++ b/src/infra/event-store.ts @@ -0,0 +1,234 @@ +/** + * Event Store Integration for OpenClaw + * + * Publishes all agent events to NATS JetStream for persistent storage. + * This enables: + * - Full audit trail of all interactions + * - Context rebuild from events (no more forgetting) + * - Multi-agent event sharing + * - Time-travel debugging + */ + +import { connect, type NatsConnection, type JetStreamClient, StringCodec } from "nats"; +import type { AgentEventPayload } from "./agent-events.js"; +import { onAgentEvent } from "./agent-events.js"; + +const sc = StringCodec(); + +export type EventStoreConfig = { + enabled: boolean; + natsUrl: string; + streamName: string; + subjectPrefix: string; +}; + +export type ClawEvent = { + id: string; + timestamp: number; + agent: string; + session: string; + type: EventType; + visibility: Visibility; + payload: AgentEventPayload; + meta: { + runId: string; + seq: number; + stream: string; + }; +}; + +export type EventType = + | "conversation.message.in" + | "conversation.message.out" + | "conversation.tool_call" + | "conversation.tool_result" + | "lifecycle.start" + | "lifecycle.end" + | "lifecycle.error"; + +export type Visibility = "public" | "internal" | "confidential"; + +let natsConnection: NatsConnection | null = null; +let jetstream: JetStreamClient | null = null; +let unsubscribe: (() => void) | null = null; +let eventStoreConfig: EventStoreConfig | null = null; + +/** + * Generate a ULID-like ID (time-sortable) + */ +function generateEventId(): string { + const timestamp = Date.now().toString(36); + const random = Math.random().toString(36).substring(2, 10); + return `${timestamp}-${random}`; +} + +/** + * Map agent event stream to our event type + */ +function mapStreamToEventType(stream: string, data: Record): EventType { + if (stream === "lifecycle") { + const phase = data?.phase as string; + if (phase === "start") return "lifecycle.start"; + if (phase === "end") return "lifecycle.end"; + if (phase === "error") return "lifecycle.error"; + return "lifecycle.start"; + } + if (stream === "tool") { + const hasResult = "result" in data || "output" in data; + return hasResult ? "conversation.tool_result" : "conversation.tool_call"; + } + if (stream === "assistant") { + return "conversation.message.out"; + } + if (stream === "error") { + return "lifecycle.error"; + } + return "conversation.message.out"; +} + +/** + * Extract agent name from session key + * Format: "main" or "agent-name:session-id" + */ +function extractAgentFromSession(sessionKey?: string): string { + if (!sessionKey) return "unknown"; + if (sessionKey === "main") return "main"; + const parts = sessionKey.split(":"); + return parts[0] || "unknown"; +} + +/** + * Convert AgentEventPayload to ClawEvent + */ +function toClawEvent(evt: AgentEventPayload): ClawEvent { + return { + id: generateEventId(), + timestamp: evt.ts, + agent: extractAgentFromSession(evt.sessionKey), + session: evt.sessionKey || "unknown", + type: mapStreamToEventType(evt.stream, evt.data), + visibility: "internal", + payload: evt, + meta: { + runId: evt.runId, + seq: evt.seq, + stream: evt.stream, + }, + }; +} + +/** + * Publish event to NATS JetStream + */ +async function publishEvent(evt: AgentEventPayload): Promise { + if (!jetstream || !eventStoreConfig) { + return; + } + + try { + const clawEvent = toClawEvent(evt); + const subject = `${eventStoreConfig.subjectPrefix}.${clawEvent.agent}.${clawEvent.type.replace(/\./g, "_")}`; + const payload = sc.encode(JSON.stringify(clawEvent)); + + await jetstream.publish(subject, payload); + } catch (err) { + // Log but don't throw — event store should never break core functionality + console.error("[event-store] Failed to publish event:", err); + } +} + +/** + * Ensure the JetStream stream exists + */ +async function ensureStream(js: JetStreamClient, config: EventStoreConfig): Promise { + const jsm = await natsConnection!.jetstreamManager(); + + try { + await jsm.streams.info(config.streamName); + } catch { + // Stream doesn't exist, create it + await jsm.streams.add({ + name: config.streamName, + subjects: [`${config.subjectPrefix}.>`], + retention: "limits" as const, + max_msgs: -1, + max_bytes: -1, + max_age: 0, // Never expire + storage: "file" as const, + num_replicas: 1, + duplicate_window: 120_000_000_000, // 2 minutes in nanoseconds + }); + console.log(`[event-store] Created stream: ${config.streamName}`); + } +} + +/** + * Initialize the event store connection + */ +export async function initEventStore(config: EventStoreConfig): Promise { + if (!config.enabled) { + console.log("[event-store] Disabled by config"); + return; + } + + try { + eventStoreConfig = config; + + // Connect to NATS + natsConnection = await connect({ servers: config.natsUrl }); + console.log(`[event-store] Connected to NATS at ${config.natsUrl}`); + + // Get JetStream client + jetstream = natsConnection.jetstream(); + + // Ensure stream exists + await ensureStream(jetstream, config); + + // Subscribe to all agent events + unsubscribe = onAgentEvent((evt) => { + // Fire and forget — don't await to avoid blocking the event loop + publishEvent(evt).catch(() => {}); + }); + + console.log("[event-store] Event listener registered"); + } catch (err) { + console.error("[event-store] Failed to initialize:", err); + // Don't throw — event store failure shouldn't prevent gateway startup + } +} + +/** + * Shutdown the event store connection + */ +export async function shutdownEventStore(): Promise { + if (unsubscribe) { + unsubscribe(); + unsubscribe = null; + } + + if (natsConnection) { + await natsConnection.drain(); + natsConnection = null; + jetstream = null; + } + + eventStoreConfig = null; + console.log("[event-store] Shutdown complete"); +} + +/** + * Check if event store is connected + */ +export function isEventStoreConnected(): boolean { + return natsConnection !== null && !natsConnection.isClosed(); +} + +/** + * Get event store status + */ +export function getEventStoreStatus(): { connected: boolean; config: EventStoreConfig | null } { + return { + connected: isEventStoreConnected(), + config: eventStoreConfig, + }; +}