#!/usr/bin/env node /** * Claudia Monitor - WebSocket Bridge * Streams NATS events to the frontend + provides stats API */ import { connect, StringCodec } from 'nats' import { WebSocketServer } from 'ws' import http from 'http' const NATS_URL = process.env.NATS_URL || 'nats://localhost:4222' const WS_PORT = parseInt(process.env.WS_PORT || '8765') const HTTP_PORT = parseInt(process.env.HTTP_PORT || '8766') const sc = StringCodec() let nc = null /** * Parse NATS URL with optional credentials * Supports: nats://user:pass@host:port or nats://host:port */ function parseNatsUrl(urlString) { try { const httpUrl = urlString.replace(/^nats:\/\//, 'http://') const url = new URL(httpUrl) const servers = `${url.hostname}:${url.port || 4222}` if (url.username && url.password) { return { servers, user: decodeURIComponent(url.username), pass: decodeURIComponent(url.password), } } return { servers } } catch { return { servers: urlString.replace(/^nats:\/\//, '') } } } let stats = { total: 0, byType: {}, agents: {} } const bridgeStartTime = Date.now() // Agent configuration const AGENTS = { main: { name: 'Claudia', emoji: '🛡️', stream: 'openclaw-events' }, 'mondo-assistant': { name: 'Mona', emoji: '🌙', stream: 'events-mondo-assistant' }, vera: { name: 'Vera', emoji: '🔒', stream: 'events-vera' }, stella: { name: 'Stella', emoji: '💰', stream: 'events-stella' }, viola: { name: 'Viola', emoji: '⚙️', stream: 'events-viola' } } // Connect to NATS async function connectNats() { try { const { servers, user, pass } = parseNatsUrl(NATS_URL) const connectOpts = { servers, ...(user && pass ? { user, pass } : {}) } nc = await connect(connectOpts) console.log(`📡 Connected to NATS at ${servers}${user ? ' (authenticated)' : ''}`) // Get initial stats from stream const js = nc.jetstream() const stream = await js.streams.get('openclaw-events') const info = await stream.info({ subjects_filter: '>' }) stats.total = info.state.messages // Get subject breakdown from stream subjects try { const { execSync } = await import('child_process') // Use monitor credentials for CLI const natsCliUrl = NATS_URL.includes('@') ? `-s "${NATS_URL}"` : '' const output = execSync(`/home/keller/bin/nats ${natsCliUrl} stream subjects openclaw-events --json 2>/dev/null`).toString() const subjects = JSON.parse(output) stats.subCategories = {} if (subjects && typeof subjects === 'object') { for (const [subject, count] of Object.entries(subjects)) { // Main categories if (subject.includes('message')) stats.byType.message = (stats.byType.message || 0) + count else if (subject.includes('tool')) stats.byType.tool = (stats.byType.tool || 0) + count else if (subject.includes('knowledge')) stats.byType.knowledge = (stats.byType.knowledge || 0) + count else if (subject.includes('lifecycle')) stats.byType.lifecycle = (stats.byType.lifecycle || 0) + count // Sub-categories (last part of subject) const subCat = subject.split('.').pop() stats.subCategories[subCat] = count } } } catch (e) { console.log('Could not get subject breakdown:', e.message) } console.log(`📊 Stream has ${stats.total} events`, stats.byType) return true } catch (e) { console.error('❌ NATS connection failed:', e.message) return false } } // WebSocket server for real-time events function startWebSocket() { const wss = new WebSocketServer({ port: WS_PORT }) wss.on('connection', async (ws) => { console.log('🔌 Client connected') ws.on('message', async (msg) => { try { const data = JSON.parse(msg.toString()) if (data.action === 'subscribe' && data.pattern) { // Subscribe to NATS pattern const sub = nc.subscribe(data.pattern) console.log(`📥 Subscribed to: ${data.pattern}`) ;(async () => { for await (const m of sub) { if (ws.readyState !== 1) break const subject = m.subject let payload = {} try { payload = JSON.parse(sc.decode(m.data)) } catch { payload = { raw: sc.decode(m.data) } } // Update stats stats.total++ const type = subject.split('.')[3] || 'unknown' stats.byType[type] = (stats.byType[type] || 0) + 1 // Extract agent from payload.session: "agent:main:main" or "agent:viola:xxx" const sessionKey = payload?.session || payload?.sessionKey || '' const sessionParts = sessionKey.split(':') // Format: agent:: → we want index 1 const agentFromSession = sessionParts[1] || 'main' // Fallback: try subject (openclaw.events..) const subjectParts = subject.split('.') const agentFromSubject = subjectParts[2] // Use session first (more reliable), skip generic "agent" const agent = (agentFromSession && agentFromSession !== 'agent') ? agentFromSession : (agentFromSubject !== 'agent' ? agentFromSubject : 'main') // Send to client with explicit agent ws.send(JSON.stringify({ type: 'event', subject, agent, data: payload, timestamp: Date.now() })) } })() } } catch (e) { console.error('Message parse error:', e) } }) ws.on('close', () => { console.log('❌ Client disconnected') }) }) console.log(`🌐 WebSocket server on ws://0.0.0.0:${WS_PORT}`) } // HTTP server for stats API function startHttp() { const server = http.createServer(async (req, res) => { // CORS res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Content-Type', 'application/json') if (req.url === '/stats') { // Fetch stats from ALL agent streams try { const { execSync } = await import('child_process') stats.total = 0 stats.byType = {} stats.subCategories = {} stats.agents = {} for (const [agentId, agentConfig] of Object.entries(AGENTS)) { try { // Get stream info const infoOutput = execSync(`/home/keller/bin/nats stream info ${agentConfig.stream} --json 2>/dev/null`).toString() const info = JSON.parse(infoOutput) const messages = info?.state?.messages || 0 const bytes = info?.state?.bytes || 0 const lastTs = info?.state?.last_ts || null // Get subject breakdown let subjects = {} try { const subjectsOutput = execSync(`/home/keller/bin/nats stream subjects ${agentConfig.stream} --json 2>/dev/null`).toString() subjects = JSON.parse(subjectsOutput) || {} } catch (e) {} // Calculate event types for this agent let msgIn = 0, msgOut = 0, toolCalls = 0, lifecycle = 0 for (const [subject, count] of Object.entries(subjects)) { if (subject.includes('message_in')) msgIn += count else if (subject.includes('message_out')) msgOut += count else if (subject.includes('tool')) toolCalls += count else if (subject.includes('lifecycle')) lifecycle += count // Global stats if (subject.includes('message')) stats.byType.message = (stats.byType.message || 0) + count else if (subject.includes('tool')) stats.byType.tool = (stats.byType.tool || 0) + count else if (subject.includes('knowledge')) stats.byType.knowledge = (stats.byType.knowledge || 0) + count else if (subject.includes('lifecycle')) stats.byType.lifecycle = (stats.byType.lifecycle || 0) + count const subCat = subject.split('.').pop() stats.subCategories[subCat] = (stats.subCategories[subCat] || 0) + count } stats.agents[agentId] = { ...agentConfig, messages, bytes, lastTs, types: { msgIn, msgOut, toolCalls, lifecycle } } stats.total += messages } catch (e) { // Stream doesn't exist or error stats.agents[agentId] = { ...agentConfig, messages: 0, bytes: 0, lastTs: null, types: { msgIn: 0, msgOut: 0, toolCalls: 0, lifecycle: 0 } } } } } catch (e) { console.log('Stats refresh error:', e.message) } // Get GATEWAY uptime let gatewayUptime = null try { const { execSync } = await import('child_process') const etime = execSync('ps -o etimes= -p $(pgrep -f "openclaw-gateway" | head -1) 2>/dev/null').toString().trim() if (etime) { const seconds = parseInt(etime) gatewayUptime = { seconds, formatted: formatUptime(seconds) } } } catch (e) { const uptimeSeconds = Math.floor((Date.now() - bridgeStartTime) / 1000) gatewayUptime = { seconds: uptimeSeconds, formatted: formatUptime(uptimeSeconds) + ' (bridge)' } } res.writeHead(200) res.end(JSON.stringify({ ...stats, uptime: gatewayUptime })) } else if (req.url === '/health') { res.writeHead(200) res.end(JSON.stringify({ status: 'ok', nats: !!nc })) } else { res.writeHead(404) res.end(JSON.stringify({ error: 'Not found' })) } }) function formatUptime(seconds) { const d = Math.floor(seconds / 86400) const h = Math.floor((seconds % 86400) / 3600) const m = Math.floor((seconds % 3600) / 60) const s = seconds % 60 if (d > 0) return `${d}d ${h}h ${m}m` if (h > 0) return `${h}h ${m}m ${s}s` return `${m}m ${s}s` } server.listen(HTTP_PORT, '0.0.0.0', () => { console.log(`📊 Stats API on http://0.0.0.0:${HTTP_PORT}`) }) } // Main async function main() { console.log('🛡️ Claudia Monitor Bridge starting...') if (await connectNats()) { startWebSocket() startHttp() console.log('✅ Bridge ready!') } else { process.exit(1) } } main()