fix(types): Add EventStoreConfig type + fix NATS enum imports
Some checks are pending
CI / install-check (push) Waiting to run
CI / checks (bunx tsc -p tsconfig.json --noEmit false, bun, build) (push) Waiting to run
CI / checks (pnpm build && pnpm lint, node, lint) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && bunx vitest run, bun, test) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks (pnpm format, node, format) (push) Waiting to run
CI / checks (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks (pnpm tsgo, node, tsgo) (push) Waiting to run
CI / secrets (push) Waiting to run
CI / checks-windows (pnpm build && pnpm lint, node, build & lint) (push) Waiting to run
CI / checks-windows (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks-windows (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks-macos (pnpm test, test) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift build --package-path apps/macos --configuration release; then
exit 0
fi
echo "swift build failed (attempt $attempt/3). Retrying…"
sleep $((attempt * 20))
done
exit 1
, build) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift test --package-path apps/macos --parallel --enable-code-coverage --show-codecov-path; then
exit 0
fi
echo "swift test failed (attempt $attempt/3). Retrying…"
sleep $((attempt … (push) Waiting to run
CI / macos-app (swiftlint --config .swiftlint.yml
swiftformat --lint apps/macos/Sources --config .swiftformat
, lint) (push) Waiting to run
CI / ios (push) Waiting to run
CI / android (./gradlew --no-daemon :app:assembleDebug, build) (push) Waiting to run
CI / android (./gradlew --no-daemon :app:testDebugUnitTest, test) (push) Waiting to run
Workflow Sanity / no-tabs (push) Waiting to run
Some checks are pending
CI / install-check (push) Waiting to run
CI / checks (bunx tsc -p tsconfig.json --noEmit false, bun, build) (push) Waiting to run
CI / checks (pnpm build && pnpm lint, node, lint) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && bunx vitest run, bun, test) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks (pnpm format, node, format) (push) Waiting to run
CI / checks (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks (pnpm tsgo, node, tsgo) (push) Waiting to run
CI / secrets (push) Waiting to run
CI / checks-windows (pnpm build && pnpm lint, node, build & lint) (push) Waiting to run
CI / checks-windows (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks-windows (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks-macos (pnpm test, test) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift build --package-path apps/macos --configuration release; then
exit 0
fi
echo "swift build failed (attempt $attempt/3). Retrying…"
sleep $((attempt * 20))
done
exit 1
, build) (push) Waiting to run
CI / macos-app (set -euo pipefail
for attempt in 1 2 3; do
if swift test --package-path apps/macos --parallel --enable-code-coverage --show-codecov-path; then
exit 0
fi
echo "swift test failed (attempt $attempt/3). Retrying…"
sleep $((attempt … (push) Waiting to run
CI / macos-app (swiftlint --config .swiftlint.yml
swiftformat --lint apps/macos/Sources --config .swiftformat
, lint) (push) Waiting to run
CI / ios (push) Waiting to run
CI / android (./gradlew --no-daemon :app:assembleDebug, build) (push) Waiting to run
CI / android (./gradlew --no-daemon :app:testDebugUnitTest, test) (push) Waiting to run
Workflow Sanity / no-tabs (push) Waiting to run
This commit is contained in:
parent
f1881a5094
commit
43ef154c53
3 changed files with 12702 additions and 14 deletions
12668
package-lock.json
generated
Normal file
12668
package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -207,6 +207,17 @@ export type GatewayNodesConfig = {
|
||||||
denyCommands?: string[];
|
denyCommands?: string[];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export type EventStoreConfig = {
|
||||||
|
/** Enable Event Store (NATS JetStream) integration. */
|
||||||
|
enabled?: boolean;
|
||||||
|
/** NATS server URL (default: nats://localhost:4222). */
|
||||||
|
natsUrl?: string;
|
||||||
|
/** JetStream stream name (default: openclaw-events). */
|
||||||
|
streamName?: string;
|
||||||
|
/** Subject prefix for events (default: openclaw.events). */
|
||||||
|
subjectPrefix?: string;
|
||||||
|
};
|
||||||
|
|
||||||
export type GatewayConfig = {
|
export type GatewayConfig = {
|
||||||
/** Single multiplexed port for Gateway WS + HTTP (default: 18789). */
|
/** Single multiplexed port for Gateway WS + HTTP (default: 18789). */
|
||||||
port?: number;
|
port?: number;
|
||||||
|
|
@ -235,6 +246,8 @@ export type GatewayConfig = {
|
||||||
tls?: GatewayTlsConfig;
|
tls?: GatewayTlsConfig;
|
||||||
http?: GatewayHttpConfig;
|
http?: GatewayHttpConfig;
|
||||||
nodes?: GatewayNodesConfig;
|
nodes?: GatewayNodesConfig;
|
||||||
|
/** Event Store (NATS JetStream) configuration for persistent event logging. */
|
||||||
|
eventStore?: EventStoreConfig;
|
||||||
/**
|
/**
|
||||||
* IPs of trusted reverse proxies (e.g. Traefik, nginx). When a connection
|
* IPs of trusted reverse proxies (e.g. Traefik, nginx). When a connection
|
||||||
* arrives from one of these IPs, the Gateway trusts `x-forwarded-for` (or
|
* arrives from one of these IPs, the Gateway trusts `x-forwarded-for` (or
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
/**
|
/**
|
||||||
* Event Store Integration for OpenClaw
|
* Event Store Integration for OpenClaw
|
||||||
*
|
*
|
||||||
* Publishes all agent events to NATS JetStream for persistent storage.
|
* Publishes all agent events to NATS JetStream for persistent storage.
|
||||||
* This enables:
|
* This enables:
|
||||||
* - Full audit trail of all interactions
|
* - Full audit trail of all interactions
|
||||||
|
|
@ -9,7 +9,14 @@
|
||||||
* - Time-travel debugging
|
* - Time-travel debugging
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { connect, type NatsConnection, type JetStreamClient, StringCodec } from "nats";
|
import {
|
||||||
|
connect,
|
||||||
|
type NatsConnection,
|
||||||
|
type JetStreamClient,
|
||||||
|
StringCodec,
|
||||||
|
RetentionPolicy,
|
||||||
|
StorageType,
|
||||||
|
} from "nats";
|
||||||
import type { AgentEventPayload } from "./agent-events.js";
|
import type { AgentEventPayload } from "./agent-events.js";
|
||||||
import { onAgentEvent } from "./agent-events.js";
|
import { onAgentEvent } from "./agent-events.js";
|
||||||
|
|
||||||
|
|
@ -37,7 +44,7 @@ export type ClawEvent = {
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
export type EventType =
|
export type EventType =
|
||||||
| "conversation.message.in"
|
| "conversation.message.in"
|
||||||
| "conversation.message.out"
|
| "conversation.message.out"
|
||||||
| "conversation.tool_call"
|
| "conversation.tool_call"
|
||||||
|
|
@ -129,7 +136,7 @@ async function publishEvent(evt: AgentEventPayload): Promise<void> {
|
||||||
const clawEvent = toClawEvent(evt);
|
const clawEvent = toClawEvent(evt);
|
||||||
const subject = `${eventStoreConfig.subjectPrefix}.${clawEvent.agent}.${clawEvent.type.replace(/\./g, "_")}`;
|
const subject = `${eventStoreConfig.subjectPrefix}.${clawEvent.agent}.${clawEvent.type.replace(/\./g, "_")}`;
|
||||||
const payload = sc.encode(JSON.stringify(clawEvent));
|
const payload = sc.encode(JSON.stringify(clawEvent));
|
||||||
|
|
||||||
await jetstream.publish(subject, payload);
|
await jetstream.publish(subject, payload);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
// Log but don't throw — event store should never break core functionality
|
// Log but don't throw — event store should never break core functionality
|
||||||
|
|
@ -142,7 +149,7 @@ async function publishEvent(evt: AgentEventPayload): Promise<void> {
|
||||||
*/
|
*/
|
||||||
async function ensureStream(js: JetStreamClient, config: EventStoreConfig): Promise<void> {
|
async function ensureStream(js: JetStreamClient, config: EventStoreConfig): Promise<void> {
|
||||||
const jsm = await natsConnection!.jetstreamManager();
|
const jsm = await natsConnection!.jetstreamManager();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await jsm.streams.info(config.streamName);
|
await jsm.streams.info(config.streamName);
|
||||||
} catch {
|
} catch {
|
||||||
|
|
@ -150,11 +157,11 @@ async function ensureStream(js: JetStreamClient, config: EventStoreConfig): Prom
|
||||||
await jsm.streams.add({
|
await jsm.streams.add({
|
||||||
name: config.streamName,
|
name: config.streamName,
|
||||||
subjects: [`${config.subjectPrefix}.>`],
|
subjects: [`${config.subjectPrefix}.>`],
|
||||||
retention: "limits" as const,
|
retention: RetentionPolicy.Limits,
|
||||||
max_msgs: -1,
|
max_msgs: -1,
|
||||||
max_bytes: -1,
|
max_bytes: -1,
|
||||||
max_age: 0, // Never expire
|
max_age: 0, // Never expire
|
||||||
storage: "file" as const,
|
storage: StorageType.File,
|
||||||
num_replicas: 1,
|
num_replicas: 1,
|
||||||
duplicate_window: 120_000_000_000, // 2 minutes in nanoseconds
|
duplicate_window: 120_000_000_000, // 2 minutes in nanoseconds
|
||||||
});
|
});
|
||||||
|
|
@ -173,23 +180,23 @@ export async function initEventStore(config: EventStoreConfig): Promise<void> {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
eventStoreConfig = config;
|
eventStoreConfig = config;
|
||||||
|
|
||||||
// Connect to NATS
|
// Connect to NATS
|
||||||
natsConnection = await connect({ servers: config.natsUrl });
|
natsConnection = await connect({ servers: config.natsUrl });
|
||||||
console.log(`[event-store] Connected to NATS at ${config.natsUrl}`);
|
console.log(`[event-store] Connected to NATS at ${config.natsUrl}`);
|
||||||
|
|
||||||
// Get JetStream client
|
// Get JetStream client
|
||||||
jetstream = natsConnection.jetstream();
|
jetstream = natsConnection.jetstream();
|
||||||
|
|
||||||
// Ensure stream exists
|
// Ensure stream exists
|
||||||
await ensureStream(jetstream, config);
|
await ensureStream(jetstream, config);
|
||||||
|
|
||||||
// Subscribe to all agent events
|
// Subscribe to all agent events
|
||||||
unsubscribe = onAgentEvent((evt) => {
|
unsubscribe = onAgentEvent((evt) => {
|
||||||
// Fire and forget — don't await to avoid blocking the event loop
|
// Fire and forget — don't await to avoid blocking the event loop
|
||||||
publishEvent(evt).catch(() => {});
|
publishEvent(evt).catch(() => {});
|
||||||
});
|
});
|
||||||
|
|
||||||
console.log("[event-store] Event listener registered");
|
console.log("[event-store] Event listener registered");
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("[event-store] Failed to initialize:", err);
|
console.error("[event-store] Failed to initialize:", err);
|
||||||
|
|
@ -205,13 +212,13 @@ export async function shutdownEventStore(): Promise<void> {
|
||||||
unsubscribe();
|
unsubscribe();
|
||||||
unsubscribe = null;
|
unsubscribe = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (natsConnection) {
|
if (natsConnection) {
|
||||||
await natsConnection.drain();
|
await natsConnection.drain();
|
||||||
natsConnection = null;
|
natsConnection = null;
|
||||||
jetstream = null;
|
jetstream = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
eventStoreConfig = null;
|
eventStoreConfig = null;
|
||||||
console.log("[event-store] Shutdown complete");
|
console.log("[event-store] Shutdown complete");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue