feat(core): Add Event Context Builder (Phase 2)
Some checks are pending
CI / install-check (push) Waiting to run
CI / checks (bunx tsc -p tsconfig.json --noEmit false, bun, build) (push) Waiting to run
CI / checks (pnpm build && pnpm lint, node, lint) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && bunx vitest run, bun, test) (push) Waiting to run
CI / checks (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks (pnpm format, node, format) (push) Waiting to run
CI / checks (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks (pnpm tsgo, node, tsgo) (push) Waiting to run
CI / secrets (push) Waiting to run
CI / checks-windows (pnpm build && pnpm lint, node, build & lint) (push) Waiting to run
CI / checks-windows (pnpm canvas:a2ui:bundle && pnpm test, node, test) (push) Waiting to run
CI / checks-windows (pnpm protocol:check, node, protocol) (push) Waiting to run
CI / checks-macos (pnpm test, test) (push) Waiting to run
CI / macos-app (set -euo pipefail for attempt in 1 2 3; do if swift build --package-path apps/macos --configuration release; then exit 0 fi echo "swift build failed (attempt $attempt/3). Retrying…" sleep $((attempt * 20)) done exit 1 , build) (push) Waiting to run
CI / macos-app (set -euo pipefail for attempt in 1 2 3; do if swift test --package-path apps/macos --parallel --enable-code-coverage --show-codecov-path; then exit 0 fi echo "swift test failed (attempt $attempt/3). Retrying…" sleep $((attempt … (push) Waiting to run
CI / macos-app (swiftlint --config .swiftlint.yml swiftformat --lint apps/macos/Sources --config .swiftformat , lint) (push) Waiting to run
CI / ios (push) Waiting to run
CI / android (./gradlew --no-daemon :app:assembleDebug, build) (push) Waiting to run
CI / android (./gradlew --no-daemon :app:testDebugUnitTest, test) (push) Waiting to run
Workflow Sanity / no-tabs (push) Waiting to run

- Query events from NATS JetStream
- Extract conversation messages (deduplicated)
- Extract active topics from recent messages
- Format context for system prompt injection
- CLI helper for testing

Phase 2 of RFC-001: Event-Sourced Memory
This commit is contained in:
Claudia 2026-02-02 10:43:21 +01:00
parent 43ef154c53
commit f162fa1401

328
src/infra/event-context.ts Normal file
View file

@ -0,0 +1,328 @@
/**
* Event Context Builder for OpenClaw
*
* Phase 2 of RFC-001: Build session context from events
*
* This replaces file-based memory with event-sourced context.
* On session start, we query recent events and build a context document.
*/
import {
connect,
type NatsConnection,
type JetStreamClient,
StringCodec,
AckPolicy,
DeliverPolicy,
} from "nats";
const sc = StringCodec();
export type EventContextConfig = {
natsUrl: string;
streamName: string;
subjectPrefix: string;
};
export type ContextOptions = {
/** Agent to build context for */
agent: string;
/** Session key */
sessionKey?: string;
/** Hours of history to include (default: 24) */
hoursBack?: number;
/** Max events to process (default: 1000) */
maxEvents?: number;
/** Include tool calls in context (default: false) */
includeTools?: boolean;
};
export type EventContext = {
/** Recent conversation messages */
recentMessages: ConversationMessage[];
/** Summary of older conversations */
conversationSummary?: string;
/** Active topics being discussed */
activeTopics: string[];
/** Pending decisions/questions */
pendingItems: string[];
/** Facts learned in this period */
facts: string[];
/** Timestamp of context build */
builtAt: number;
/** Number of events processed */
eventsProcessed: number;
};
export type ConversationMessage = {
timestamp: number;
role: "user" | "assistant";
text: string;
session: string;
};
export type StoredEvent = {
id: string;
timestamp: number;
agent: string;
session: string;
type: string;
visibility: string;
payload: {
runId: string;
stream: string;
data: Record<string, unknown>;
sessionKey?: string;
seq: number;
ts: number;
};
meta: {
runId: string;
seq: number;
stream: string;
};
};
/**
* Query events from NATS JetStream
*/
async function queryEvents(
config: EventContextConfig,
options: ContextOptions,
): Promise<StoredEvent[]> {
const nc = await connect({ servers: config.natsUrl });
try {
const js = nc.jetstream();
const jsm = await nc.jetstreamManager();
// Check if stream exists
try {
await jsm.streams.info(config.streamName);
} catch {
console.log("[event-context] Stream not found, returning empty context");
return [];
}
const events: StoredEvent[] = [];
const hoursBack = options.hoursBack ?? 24;
const maxEvents = options.maxEvents ?? 1000;
const startTime = Date.now() - hoursBack * 60 * 60 * 1000;
// Create ephemeral consumer for querying
const consumerName = `context-query-${Date.now()}`;
// Build subject filter
const subject = options.agent
? `${config.subjectPrefix}.${options.agent}.>`
: `${config.subjectPrefix}.>`;
const consumer = await js.consumers.get(config.streamName, {
name: consumerName,
ack_policy: AckPolicy.None,
deliver_policy: DeliverPolicy.StartTime,
opt_start_time: new Date(startTime).toISOString(),
filter_subject: subject,
inactive_threshold: 30_000_000_000, // 30 seconds in nanos
});
// Fetch messages
const messages = await consumer.fetch({ max_messages: maxEvents, expires: 5000 });
for await (const msg of messages) {
try {
const data = JSON.parse(sc.decode(msg.data)) as StoredEvent;
// Filter by session if specified
if (options.sessionKey && data.session !== options.sessionKey) {
continue;
}
// Filter out tool events unless requested
if (!options.includeTools && data.type.includes("tool")) {
continue;
}
events.push(data);
} catch (err) {
console.error("[event-context] Failed to parse event:", err);
}
}
// Delete ephemeral consumer
try {
await jsm.consumers.delete(config.streamName, consumerName);
} catch {
// Ignore - may have auto-deleted
}
return events;
} finally {
await nc.drain();
}
}
/**
* Extract conversation messages from events
*/
function extractConversations(events: StoredEvent[]): ConversationMessage[] {
const messages: ConversationMessage[] = [];
const seenTexts = new Set<string>();
// Sort by timestamp
const sorted = [...events].sort((a, b) => a.timestamp - b.timestamp);
for (const event of sorted) {
if (event.type !== "conversation.message.out") continue;
const text = event.payload.data?.text as string;
if (!text || seenTexts.has(text)) continue;
// Only keep final/complete messages (skip deltas)
// We detect this by checking if this is the last event with this runId
const runId = event.payload.runId;
const laterEvents = sorted.filter(
(e) => e.payload.runId === runId && e.timestamp > event.timestamp,
);
// If there are later events with same runId, this is a delta - skip
if (laterEvents.length > 0) continue;
seenTexts.add(text);
messages.push({
timestamp: event.timestamp,
role: "assistant",
text: text.trim(),
session: event.session,
});
}
return messages;
}
/**
* Extract active topics from recent conversation
*/
function extractTopics(messages: ConversationMessage[]): string[] {
const topics: string[] = [];
const recentMessages = messages.slice(-10);
// Simple keyword extraction (could be enhanced with LLM)
const topicKeywords = new Map<string, number>();
for (const msg of recentMessages) {
// Extract capitalized phrases (likely topics)
const matches = msg.text.match(/[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*/g) || [];
for (const match of matches) {
if (match.length > 3) {
topicKeywords.set(match, (topicKeywords.get(match) || 0) + 1);
}
}
}
// Get top topics
const sorted = [...topicKeywords.entries()].sort((a, b) => b[1] - a[1]).slice(0, 5);
for (const [topic] of sorted) {
topics.push(topic);
}
return topics;
}
/**
* Build context document from events
*/
export async function buildEventContext(
config: EventContextConfig,
options: ContextOptions,
): Promise<EventContext> {
const events = await queryEvents(config, options);
const messages = extractConversations(events);
const topics = extractTopics(messages);
// Split messages into recent (last 2 hours) and older
const twoHoursAgo = Date.now() - 2 * 60 * 60 * 1000;
const recentMessages = messages.filter((m) => m.timestamp > twoHoursAgo);
const olderMessages = messages.filter((m) => m.timestamp <= twoHoursAgo);
// Create summary of older messages (simple for now)
let conversationSummary: string | undefined;
if (olderMessages.length > 0) {
const count = olderMessages.length;
const firstTime = new Date(olderMessages[0].timestamp).toLocaleString();
conversationSummary = `[${count} earlier messages starting from ${firstTime}]`;
}
return {
recentMessages,
conversationSummary,
activeTopics: topics,
pendingItems: [], // TODO: Extract from lifecycle events
facts: [], // TODO: Extract from fact events
builtAt: Date.now(),
eventsProcessed: events.length,
};
}
/**
* Format context for injection into system prompt
*/
export function formatContextForPrompt(context: EventContext): string {
const lines: string[] = [];
lines.push("## Event-Sourced Context");
lines.push(`Built at: ${new Date(context.builtAt).toISOString()}`);
lines.push(`Events processed: ${context.eventsProcessed}`);
lines.push("");
if (context.activeTopics.length > 0) {
lines.push("### Active Topics");
for (const topic of context.activeTopics) {
lines.push(`- ${topic}`);
}
lines.push("");
}
if (context.conversationSummary) {
lines.push("### Earlier Conversation");
lines.push(context.conversationSummary);
lines.push("");
}
if (context.recentMessages.length > 0) {
lines.push("### Recent Messages (last 2h)");
for (const msg of context.recentMessages.slice(-5)) {
const time = new Date(msg.timestamp).toLocaleTimeString();
const preview = msg.text.length > 100 ? msg.text.substring(0, 100) + "..." : msg.text;
lines.push(`- [${time}] ${msg.role}: ${preview}`);
}
lines.push("");
}
return lines.join("\n");
}
/**
* CLI helper to test context building
*/
export async function testContextBuild(natsUrl: string = "nats://localhost:4222"): Promise<void> {
console.log("Building event context...");
const config: EventContextConfig = {
natsUrl,
streamName: "openclaw-events",
subjectPrefix: "openclaw.events",
};
const context = await buildEventContext(config, {
agent: "agent",
hoursBack: 1,
maxEvents: 100,
});
console.log("\n=== Event Context ===");
console.log(formatContextForPrompt(context));
console.log("\n=== Raw Context ===");
console.log(JSON.stringify(context, null, 2));
}