diff --git a/src/infra/event-context.ts b/src/infra/event-context.ts new file mode 100644 index 000000000..4c0457caf --- /dev/null +++ b/src/infra/event-context.ts @@ -0,0 +1,328 @@ +/** + * Event Context Builder for OpenClaw + * + * Phase 2 of RFC-001: Build session context from events + * + * This replaces file-based memory with event-sourced context. + * On session start, we query recent events and build a context document. + */ + +import { + connect, + type NatsConnection, + type JetStreamClient, + StringCodec, + AckPolicy, + DeliverPolicy, +} from "nats"; + +const sc = StringCodec(); + +export type EventContextConfig = { + natsUrl: string; + streamName: string; + subjectPrefix: string; +}; + +export type ContextOptions = { + /** Agent to build context for */ + agent: string; + /** Session key */ + sessionKey?: string; + /** Hours of history to include (default: 24) */ + hoursBack?: number; + /** Max events to process (default: 1000) */ + maxEvents?: number; + /** Include tool calls in context (default: false) */ + includeTools?: boolean; +}; + +export type EventContext = { + /** Recent conversation messages */ + recentMessages: ConversationMessage[]; + /** Summary of older conversations */ + conversationSummary?: string; + /** Active topics being discussed */ + activeTopics: string[]; + /** Pending decisions/questions */ + pendingItems: string[]; + /** Facts learned in this period */ + facts: string[]; + /** Timestamp of context build */ + builtAt: number; + /** Number of events processed */ + eventsProcessed: number; +}; + +export type ConversationMessage = { + timestamp: number; + role: "user" | "assistant"; + text: string; + session: string; +}; + +export type StoredEvent = { + id: string; + timestamp: number; + agent: string; + session: string; + type: string; + visibility: string; + payload: { + runId: string; + stream: string; + data: Record; + sessionKey?: string; + seq: number; + ts: number; + }; + meta: { + runId: string; + seq: number; + stream: string; + }; +}; + +/** + * Query events from NATS JetStream + */ +async function queryEvents( + config: EventContextConfig, + options: ContextOptions, +): Promise { + const nc = await connect({ servers: config.natsUrl }); + + try { + const js = nc.jetstream(); + const jsm = await nc.jetstreamManager(); + + // Check if stream exists + try { + await jsm.streams.info(config.streamName); + } catch { + console.log("[event-context] Stream not found, returning empty context"); + return []; + } + + const events: StoredEvent[] = []; + const hoursBack = options.hoursBack ?? 24; + const maxEvents = options.maxEvents ?? 1000; + const startTime = Date.now() - hoursBack * 60 * 60 * 1000; + + // Create ephemeral consumer for querying + const consumerName = `context-query-${Date.now()}`; + + // Build subject filter + const subject = options.agent + ? `${config.subjectPrefix}.${options.agent}.>` + : `${config.subjectPrefix}.>`; + + const consumer = await js.consumers.get(config.streamName, { + name: consumerName, + ack_policy: AckPolicy.None, + deliver_policy: DeliverPolicy.StartTime, + opt_start_time: new Date(startTime).toISOString(), + filter_subject: subject, + inactive_threshold: 30_000_000_000, // 30 seconds in nanos + }); + + // Fetch messages + const messages = await consumer.fetch({ max_messages: maxEvents, expires: 5000 }); + + for await (const msg of messages) { + try { + const data = JSON.parse(sc.decode(msg.data)) as StoredEvent; + + // Filter by session if specified + if (options.sessionKey && data.session !== options.sessionKey) { + continue; + } + + // Filter out tool events unless requested + if (!options.includeTools && data.type.includes("tool")) { + continue; + } + + events.push(data); + } catch (err) { + console.error("[event-context] Failed to parse event:", err); + } + } + + // Delete ephemeral consumer + try { + await jsm.consumers.delete(config.streamName, consumerName); + } catch { + // Ignore - may have auto-deleted + } + + return events; + } finally { + await nc.drain(); + } +} + +/** + * Extract conversation messages from events + */ +function extractConversations(events: StoredEvent[]): ConversationMessage[] { + const messages: ConversationMessage[] = []; + const seenTexts = new Set(); + + // Sort by timestamp + const sorted = [...events].sort((a, b) => a.timestamp - b.timestamp); + + for (const event of sorted) { + if (event.type !== "conversation.message.out") continue; + + const text = event.payload.data?.text as string; + if (!text || seenTexts.has(text)) continue; + + // Only keep final/complete messages (skip deltas) + // We detect this by checking if this is the last event with this runId + const runId = event.payload.runId; + const laterEvents = sorted.filter( + (e) => e.payload.runId === runId && e.timestamp > event.timestamp, + ); + + // If there are later events with same runId, this is a delta - skip + if (laterEvents.length > 0) continue; + + seenTexts.add(text); + messages.push({ + timestamp: event.timestamp, + role: "assistant", + text: text.trim(), + session: event.session, + }); + } + + return messages; +} + +/** + * Extract active topics from recent conversation + */ +function extractTopics(messages: ConversationMessage[]): string[] { + const topics: string[] = []; + const recentMessages = messages.slice(-10); + + // Simple keyword extraction (could be enhanced with LLM) + const topicKeywords = new Map(); + + for (const msg of recentMessages) { + // Extract capitalized phrases (likely topics) + const matches = msg.text.match(/[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*/g) || []; + for (const match of matches) { + if (match.length > 3) { + topicKeywords.set(match, (topicKeywords.get(match) || 0) + 1); + } + } + } + + // Get top topics + const sorted = [...topicKeywords.entries()].sort((a, b) => b[1] - a[1]).slice(0, 5); + + for (const [topic] of sorted) { + topics.push(topic); + } + + return topics; +} + +/** + * Build context document from events + */ +export async function buildEventContext( + config: EventContextConfig, + options: ContextOptions, +): Promise { + const events = await queryEvents(config, options); + const messages = extractConversations(events); + const topics = extractTopics(messages); + + // Split messages into recent (last 2 hours) and older + const twoHoursAgo = Date.now() - 2 * 60 * 60 * 1000; + const recentMessages = messages.filter((m) => m.timestamp > twoHoursAgo); + const olderMessages = messages.filter((m) => m.timestamp <= twoHoursAgo); + + // Create summary of older messages (simple for now) + let conversationSummary: string | undefined; + if (olderMessages.length > 0) { + const count = olderMessages.length; + const firstTime = new Date(olderMessages[0].timestamp).toLocaleString(); + conversationSummary = `[${count} earlier messages starting from ${firstTime}]`; + } + + return { + recentMessages, + conversationSummary, + activeTopics: topics, + pendingItems: [], // TODO: Extract from lifecycle events + facts: [], // TODO: Extract from fact events + builtAt: Date.now(), + eventsProcessed: events.length, + }; +} + +/** + * Format context for injection into system prompt + */ +export function formatContextForPrompt(context: EventContext): string { + const lines: string[] = []; + + lines.push("## Event-Sourced Context"); + lines.push(`Built at: ${new Date(context.builtAt).toISOString()}`); + lines.push(`Events processed: ${context.eventsProcessed}`); + lines.push(""); + + if (context.activeTopics.length > 0) { + lines.push("### Active Topics"); + for (const topic of context.activeTopics) { + lines.push(`- ${topic}`); + } + lines.push(""); + } + + if (context.conversationSummary) { + lines.push("### Earlier Conversation"); + lines.push(context.conversationSummary); + lines.push(""); + } + + if (context.recentMessages.length > 0) { + lines.push("### Recent Messages (last 2h)"); + for (const msg of context.recentMessages.slice(-5)) { + const time = new Date(msg.timestamp).toLocaleTimeString(); + const preview = msg.text.length > 100 ? msg.text.substring(0, 100) + "..." : msg.text; + lines.push(`- [${time}] ${msg.role}: ${preview}`); + } + lines.push(""); + } + + return lines.join("\n"); +} + +/** + * CLI helper to test context building + */ +export async function testContextBuild(natsUrl: string = "nats://localhost:4222"): Promise { + console.log("Building event context..."); + + const config: EventContextConfig = { + natsUrl, + streamName: "openclaw-events", + subjectPrefix: "openclaw.events", + }; + + const context = await buildEventContext(config, { + agent: "agent", + hoursBack: 1, + maxEvents: 100, + }); + + console.log("\n=== Event Context ==="); + console.log(formatContextForPrompt(context)); + console.log("\n=== Raw Context ==="); + console.log(JSON.stringify(context, null, 2)); +}