feat(core): Add Event Store integration with NATS JetStream
Some checks are pending
CI / install-check (push) Waiting to run
CI / checks (bunx tsc -p tsconfig.json --noEmit false, bun, build) (push) Waiting to run
CI / checks (pnpm build && pnpm lint, node, lint) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && bunx vitest run, bun, test) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks (pnpm format, node, format) (push) Waiting to run
CI / checks (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks (pnpm tsgo, node, tsgo) (push) Waiting to run
CI / secrets (push) Waiting to run
CI / checks-windows (pnpm build && pnpm lint, node, build & lint) (push) Waiting to run
CI / checks-windows (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks-windows (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks-macos (pnpm test, test) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift build --package-path apps/macos --configuration release; then
exit 0
fi
echo "swift build failed (attempt $attempt/3). Retrying…"
sleep $((attempt * 20))
done
exit 1
, build) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift test --package-path apps/macos --parallel --enable-code-coverage --show-codecov-path; then
exit 0
fi
echo "swift test failed (attempt $attempt/3). Retrying…"
sleep $((attempt … (push) Waiting to run
CI / macos-app (swiftlint --config .swiftlint.yml
swiftformat --lint apps/macos/Sources --config .swiftformat
, lint) (push) Waiting to run
CI / ios (push) Waiting to run
CI / android (./gradlew --no-daemon :app:assembleDebug, build) (push) Waiting to run
CI / android (./gradlew --no-daemon :app:testDebugUnitTest, test) (push) Waiting to run
Workflow Sanity / no-tabs (push) Waiting to run
Some checks are pending
CI / install-check (push) Waiting to run
CI / checks (bunx tsc -p tsconfig.json --noEmit false, bun, build) (push) Waiting to run
CI / checks (pnpm build && pnpm lint, node, lint) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && bunx vitest run, bun, test) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks (pnpm format, node, format) (push) Waiting to run
CI / checks (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks (pnpm tsgo, node, tsgo) (push) Waiting to run
CI / secrets (push) Waiting to run
CI / checks-windows (pnpm build && pnpm lint, node, build & lint) (push) Waiting to run
CI / checks-windows (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks-windows (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks-macos (pnpm test, test) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift build --package-path apps/macos --configuration release; then
exit 0
fi
echo "swift build failed (attempt $attempt/3). Retrying…"
sleep $((attempt * 20))
done
exit 1
, build) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift test --package-path apps/macos --parallel --enable-code-coverage --show-codecov-path; then
exit 0
fi
echo "swift test failed (attempt $attempt/3). Retrying…"
sleep $((attempt … (push) Waiting to run
CI / macos-app (swiftlint --config .swiftlint.yml
swiftformat --lint apps/macos/Sources --config .swiftformat
, lint) (push) Waiting to run
CI / ios (push) Waiting to run
CI / android (./gradlew --no-daemon :app:assembleDebug, build) (push) Waiting to run
CI / android (./gradlew --no-daemon :app:testDebugUnitTest, test) (push) Waiting to run
Workflow Sanity / no-tabs (push) Waiting to run
Phase 1 of RFC-001: Event-Sourced Memory
- Add event-store.ts module for NATS JetStream integration
- All agent events (messages, tool calls, lifecycle) are published
- Configure via gateway.eventStore in config
- Events are persistent and queryable
- Non-blocking: failures don't affect core functionality
Config example:
gateway:
eventStore:
enabled: true
natsUrl: nats://localhost:4222
streamName: openclaw-events
subjectPrefix: openclaw.events
Co-authored-by: Albert Hild <albert@vainplex.de>
This commit is contained in:
parent
01449a2f41
commit
f1881a5094
5 changed files with 262 additions and 0 deletions
|
|
@ -152,6 +152,7 @@
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@agentclientprotocol/sdk": "0.13.1",
|
"@agentclientprotocol/sdk": "0.13.1",
|
||||||
|
"nats": "^2.19.0",
|
||||||
"@aws-sdk/client-bedrock": "^3.980.0",
|
"@aws-sdk/client-bedrock": "^3.980.0",
|
||||||
"@buape/carbon": "0.14.0",
|
"@buape/carbon": "0.14.0",
|
||||||
"@clack/prompts": "^1.0.0",
|
"@clack/prompts": "^1.0.0",
|
||||||
|
|
|
||||||
|
|
@ -443,6 +443,15 @@ export const OpenClawSchema = z
|
||||||
})
|
})
|
||||||
.strict()
|
.strict()
|
||||||
.optional(),
|
.optional(),
|
||||||
|
eventStore: z
|
||||||
|
.object({
|
||||||
|
enabled: z.boolean().optional(),
|
||||||
|
natsUrl: z.string().optional(),
|
||||||
|
streamName: z.string().optional(),
|
||||||
|
subjectPrefix: z.string().optional(),
|
||||||
|
})
|
||||||
|
.strict()
|
||||||
|
.optional(),
|
||||||
})
|
})
|
||||||
.strict()
|
.strict()
|
||||||
.optional(),
|
.optional(),
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import type { HeartbeatRunner } from "../infra/heartbeat-runner.js";
|
||||||
import type { PluginServicesHandle } from "../plugins/services.js";
|
import type { PluginServicesHandle } from "../plugins/services.js";
|
||||||
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
||||||
import { stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
import { stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
||||||
|
import { shutdownEventStore } from "../infra/event-store.js";
|
||||||
|
|
||||||
export function createGatewayCloseHandler(params: {
|
export function createGatewayCloseHandler(params: {
|
||||||
bonjourStop: (() => Promise<void>) | null;
|
bonjourStop: (() => Promise<void>) | null;
|
||||||
|
|
@ -124,5 +125,8 @@ export function createGatewayCloseHandler(params: {
|
||||||
httpServer.close((err) => (err ? reject(err) : resolve())),
|
httpServer.close((err) => (err ? reject(err) : resolve())),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Shutdown Event Store connection
|
||||||
|
await shutdownEventStore().catch(() => {});
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ import {
|
||||||
} from "../config/config.js";
|
} from "../config/config.js";
|
||||||
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
|
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
|
||||||
import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js";
|
import { clearAgentRunContext, onAgentEvent } from "../infra/agent-events.js";
|
||||||
|
import { initEventStore, shutdownEventStore } from "../infra/event-store.js";
|
||||||
import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js";
|
import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js";
|
||||||
import { logAcceptedEnvOption } from "../infra/env.js";
|
import { logAcceptedEnvOption } from "../infra/env.js";
|
||||||
import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js";
|
import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js";
|
||||||
|
|
@ -216,6 +217,19 @@ export async function startGatewayServer(
|
||||||
}
|
}
|
||||||
setGatewaySigusr1RestartPolicy({ allowExternal: cfgAtStart.commands?.restart === true });
|
setGatewaySigusr1RestartPolicy({ allowExternal: cfgAtStart.commands?.restart === true });
|
||||||
initSubagentRegistry();
|
initSubagentRegistry();
|
||||||
|
|
||||||
|
// Initialize Event Store if configured
|
||||||
|
const eventStoreConfig = cfgAtStart.gateway?.eventStore;
|
||||||
|
if (eventStoreConfig?.enabled) {
|
||||||
|
await initEventStore({
|
||||||
|
enabled: true,
|
||||||
|
natsUrl: eventStoreConfig.natsUrl || "nats://localhost:4222",
|
||||||
|
streamName: eventStoreConfig.streamName || "openclaw-events",
|
||||||
|
subjectPrefix: eventStoreConfig.subjectPrefix || "openclaw.events",
|
||||||
|
});
|
||||||
|
log.info("gateway: Event Store initialized");
|
||||||
|
}
|
||||||
|
|
||||||
const defaultAgentId = resolveDefaultAgentId(cfgAtStart);
|
const defaultAgentId = resolveDefaultAgentId(cfgAtStart);
|
||||||
const defaultWorkspaceDir = resolveAgentWorkspaceDir(cfgAtStart, defaultAgentId);
|
const defaultWorkspaceDir = resolveAgentWorkspaceDir(cfgAtStart, defaultAgentId);
|
||||||
const baseMethods = listGatewayMethods();
|
const baseMethods = listGatewayMethods();
|
||||||
|
|
|
||||||
234
src/infra/event-store.ts
Normal file
234
src/infra/event-store.ts
Normal file
|
|
@ -0,0 +1,234 @@
|
||||||
|
/**
|
||||||
|
* Event Store Integration for OpenClaw
|
||||||
|
*
|
||||||
|
* Publishes all agent events to NATS JetStream for persistent storage.
|
||||||
|
* This enables:
|
||||||
|
* - Full audit trail of all interactions
|
||||||
|
* - Context rebuild from events (no more forgetting)
|
||||||
|
* - Multi-agent event sharing
|
||||||
|
* - Time-travel debugging
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { connect, type NatsConnection, type JetStreamClient, StringCodec } from "nats";
|
||||||
|
import type { AgentEventPayload } from "./agent-events.js";
|
||||||
|
import { onAgentEvent } from "./agent-events.js";
|
||||||
|
|
||||||
|
const sc = StringCodec();
|
||||||
|
|
||||||
|
export type EventStoreConfig = {
|
||||||
|
enabled: boolean;
|
||||||
|
natsUrl: string;
|
||||||
|
streamName: string;
|
||||||
|
subjectPrefix: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type ClawEvent = {
|
||||||
|
id: string;
|
||||||
|
timestamp: number;
|
||||||
|
agent: string;
|
||||||
|
session: string;
|
||||||
|
type: EventType;
|
||||||
|
visibility: Visibility;
|
||||||
|
payload: AgentEventPayload;
|
||||||
|
meta: {
|
||||||
|
runId: string;
|
||||||
|
seq: number;
|
||||||
|
stream: string;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
export type EventType =
|
||||||
|
| "conversation.message.in"
|
||||||
|
| "conversation.message.out"
|
||||||
|
| "conversation.tool_call"
|
||||||
|
| "conversation.tool_result"
|
||||||
|
| "lifecycle.start"
|
||||||
|
| "lifecycle.end"
|
||||||
|
| "lifecycle.error";
|
||||||
|
|
||||||
|
export type Visibility = "public" | "internal" | "confidential";
|
||||||
|
|
||||||
|
let natsConnection: NatsConnection | null = null;
|
||||||
|
let jetstream: JetStreamClient | null = null;
|
||||||
|
let unsubscribe: (() => void) | null = null;
|
||||||
|
let eventStoreConfig: EventStoreConfig | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a ULID-like ID (time-sortable)
|
||||||
|
*/
|
||||||
|
function generateEventId(): string {
|
||||||
|
const timestamp = Date.now().toString(36);
|
||||||
|
const random = Math.random().toString(36).substring(2, 10);
|
||||||
|
return `${timestamp}-${random}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map agent event stream to our event type
|
||||||
|
*/
|
||||||
|
function mapStreamToEventType(stream: string, data: Record<string, unknown>): EventType {
|
||||||
|
if (stream === "lifecycle") {
|
||||||
|
const phase = data?.phase as string;
|
||||||
|
if (phase === "start") return "lifecycle.start";
|
||||||
|
if (phase === "end") return "lifecycle.end";
|
||||||
|
if (phase === "error") return "lifecycle.error";
|
||||||
|
return "lifecycle.start";
|
||||||
|
}
|
||||||
|
if (stream === "tool") {
|
||||||
|
const hasResult = "result" in data || "output" in data;
|
||||||
|
return hasResult ? "conversation.tool_result" : "conversation.tool_call";
|
||||||
|
}
|
||||||
|
if (stream === "assistant") {
|
||||||
|
return "conversation.message.out";
|
||||||
|
}
|
||||||
|
if (stream === "error") {
|
||||||
|
return "lifecycle.error";
|
||||||
|
}
|
||||||
|
return "conversation.message.out";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract agent name from session key
|
||||||
|
* Format: "main" or "agent-name:session-id"
|
||||||
|
*/
|
||||||
|
function extractAgentFromSession(sessionKey?: string): string {
|
||||||
|
if (!sessionKey) return "unknown";
|
||||||
|
if (sessionKey === "main") return "main";
|
||||||
|
const parts = sessionKey.split(":");
|
||||||
|
return parts[0] || "unknown";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert AgentEventPayload to ClawEvent
|
||||||
|
*/
|
||||||
|
function toClawEvent(evt: AgentEventPayload): ClawEvent {
|
||||||
|
return {
|
||||||
|
id: generateEventId(),
|
||||||
|
timestamp: evt.ts,
|
||||||
|
agent: extractAgentFromSession(evt.sessionKey),
|
||||||
|
session: evt.sessionKey || "unknown",
|
||||||
|
type: mapStreamToEventType(evt.stream, evt.data),
|
||||||
|
visibility: "internal",
|
||||||
|
payload: evt,
|
||||||
|
meta: {
|
||||||
|
runId: evt.runId,
|
||||||
|
seq: evt.seq,
|
||||||
|
stream: evt.stream,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish event to NATS JetStream
|
||||||
|
*/
|
||||||
|
async function publishEvent(evt: AgentEventPayload): Promise<void> {
|
||||||
|
if (!jetstream || !eventStoreConfig) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const clawEvent = toClawEvent(evt);
|
||||||
|
const subject = `${eventStoreConfig.subjectPrefix}.${clawEvent.agent}.${clawEvent.type.replace(/\./g, "_")}`;
|
||||||
|
const payload = sc.encode(JSON.stringify(clawEvent));
|
||||||
|
|
||||||
|
await jetstream.publish(subject, payload);
|
||||||
|
} catch (err) {
|
||||||
|
// Log but don't throw — event store should never break core functionality
|
||||||
|
console.error("[event-store] Failed to publish event:", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure the JetStream stream exists
|
||||||
|
*/
|
||||||
|
async function ensureStream(js: JetStreamClient, config: EventStoreConfig): Promise<void> {
|
||||||
|
const jsm = await natsConnection!.jetstreamManager();
|
||||||
|
|
||||||
|
try {
|
||||||
|
await jsm.streams.info(config.streamName);
|
||||||
|
} catch {
|
||||||
|
// Stream doesn't exist, create it
|
||||||
|
await jsm.streams.add({
|
||||||
|
name: config.streamName,
|
||||||
|
subjects: [`${config.subjectPrefix}.>`],
|
||||||
|
retention: "limits" as const,
|
||||||
|
max_msgs: -1,
|
||||||
|
max_bytes: -1,
|
||||||
|
max_age: 0, // Never expire
|
||||||
|
storage: "file" as const,
|
||||||
|
num_replicas: 1,
|
||||||
|
duplicate_window: 120_000_000_000, // 2 minutes in nanoseconds
|
||||||
|
});
|
||||||
|
console.log(`[event-store] Created stream: ${config.streamName}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the event store connection
|
||||||
|
*/
|
||||||
|
export async function initEventStore(config: EventStoreConfig): Promise<void> {
|
||||||
|
if (!config.enabled) {
|
||||||
|
console.log("[event-store] Disabled by config");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
eventStoreConfig = config;
|
||||||
|
|
||||||
|
// Connect to NATS
|
||||||
|
natsConnection = await connect({ servers: config.natsUrl });
|
||||||
|
console.log(`[event-store] Connected to NATS at ${config.natsUrl}`);
|
||||||
|
|
||||||
|
// Get JetStream client
|
||||||
|
jetstream = natsConnection.jetstream();
|
||||||
|
|
||||||
|
// Ensure stream exists
|
||||||
|
await ensureStream(jetstream, config);
|
||||||
|
|
||||||
|
// Subscribe to all agent events
|
||||||
|
unsubscribe = onAgentEvent((evt) => {
|
||||||
|
// Fire and forget — don't await to avoid blocking the event loop
|
||||||
|
publishEvent(evt).catch(() => {});
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log("[event-store] Event listener registered");
|
||||||
|
} catch (err) {
|
||||||
|
console.error("[event-store] Failed to initialize:", err);
|
||||||
|
// Don't throw — event store failure shouldn't prevent gateway startup
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown the event store connection
|
||||||
|
*/
|
||||||
|
export async function shutdownEventStore(): Promise<void> {
|
||||||
|
if (unsubscribe) {
|
||||||
|
unsubscribe();
|
||||||
|
unsubscribe = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (natsConnection) {
|
||||||
|
await natsConnection.drain();
|
||||||
|
natsConnection = null;
|
||||||
|
jetstream = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
eventStoreConfig = null;
|
||||||
|
console.log("[event-store] Shutdown complete");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if event store is connected
|
||||||
|
*/
|
||||||
|
export function isEventStoreConnected(): boolean {
|
||||||
|
return natsConnection !== null && !natsConnection.isClosed();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get event store status
|
||||||
|
*/
|
||||||
|
export function getEventStoreStatus(): { connected: boolean; config: EventStoreConfig | null } {
|
||||||
|
return {
|
||||||
|
connected: isEventStoreConnected(),
|
||||||
|
config: eventStoreConfig,
|
||||||
|
};
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue