darkplex-core/cortex/monitor.py
Claudia fda607c204
All checks were successful
Tests / test (push) Successful in 4s
fix: sync missing import os + stray } from darkplex-core PR #2 (YesMan)
2026-02-11 20:25:29 +01:00

166 lines
5.1 KiB
Python

#!/usr/bin/env python3
"""Neural Monitor — Multi-Agent Event Stream Dashboard.
Shows per-agent statistics from isolated NATS streams.
Usage:
cortex monitor [--json]
"""
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from cortex.config import cortex_home
# Default agents — override via DARKPLEX_AGENTS_CONFIG or agents.json in config dir
_DEFAULT_AGENTS = {
"main": {"name": "Main", "emoji": "🤖", "stream": "openclaw-events"},
}
def _load_agents():
"""Load agent config from file or env, fall back to defaults."""
config_path = os.environ.get("DARKPLEX_AGENTS_CONFIG", "")
if not config_path:
config_dir = os.environ.get("DARKPLEX_CONFIG_DIR", os.path.expanduser("~/.darkplex"))
config_path = os.path.join(config_dir, "agents.json")
if os.path.exists(config_path):
try:
with open(config_path) as f:
import json
return json.load(f)
except Exception:
pass
return _DEFAULT_AGENTS
AGENTS = _load_agents()
NATS_BIN = str(Path.home() / "bin" / "nats")
def _nats_json(args: list) -> dict | None:
try:
r = subprocess.run([NATS_BIN] + args + ["--json"],
capture_output=True, text=True, timeout=10)
if r.returncode == 0:
return json.loads(r.stdout)
except Exception:
pass
return None
def get_stream_info(stream: str) -> dict:
info = _nats_json(["stream", "info", stream])
if not info:
return {"messages": 0, "bytes": 0, "last_ts": None, "subjects": 0}
state = info.get("state", {})
return {
"messages": state.get("messages", 0),
"bytes": state.get("bytes", 0),
"last_ts": state.get("last_ts"),
"subjects": state.get("num_subjects", 0),
}
def get_stream_subjects(stream: str) -> dict:
return _nats_json(["stream", "subjects", stream]) or {}
def format_bytes(b: int) -> str:
for unit in ("B", "KB", "MB", "GB"):
if b < 1024:
return f"{b:.1f} {unit}"
b /= 1024
return f"{b:.1f} TB"
def format_age(ts_str: str | None) -> str:
if not ts_str or ts_str == "0001-01-01T00:00:00Z":
return "never"
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
now = datetime.now(ts.tzinfo)
secs = (now - ts).total_seconds()
if secs < 0 or secs > 365 * 24 * 3600 * 100:
return "never"
if secs < 60:
return f"{int(secs)}s ago"
if secs < 3600:
return f"{int(secs / 60)}m ago"
if secs < 86400:
return f"{int(secs / 3600)}h ago"
return f"{int(secs / 86400)}d ago"
except Exception:
return "unknown"
def get_dashboard() -> list[dict]:
results = []
for agent_id, cfg in AGENTS.items():
info = get_stream_info(cfg["stream"])
subjects = get_stream_subjects(cfg["stream"])
msg_in = sum(v for k, v in subjects.items() if "message_in" in k)
msg_out = sum(v for k, v in subjects.items() if "message_out" in k)
tool_calls = sum(v for k, v in subjects.items() if "tool_call" in k)
lifecycle = sum(v for k, v in subjects.items() if "lifecycle" in k)
results.append({
"agent_id": agent_id,
"name": cfg["name"],
"emoji": cfg["emoji"],
"stream": cfg["stream"],
"messages": info["messages"],
"bytes": info["bytes"],
"last_ts": info["last_ts"],
"msg_in": msg_in,
"msg_out": msg_out,
"tool_calls": tool_calls,
"lifecycle": lifecycle,
})
return results
def format_dashboard(data: list[dict]) -> str:
lines = ["\n\033[1m🧠 NEURAL MONITOR - Multi-Agent Event Streams\033[0m",
f"\033[2m{'' * 50}\033[0m\n"]
total_events = total_bytes = active = 0
for d in data:
total_events += d["messages"]
total_bytes += d["bytes"]
if d["messages"] > 0:
active += 1
lines.append(f"\033[1m{d['emoji']} {d['name']}\033[0m ({d['agent_id']})")
lines.append(f" Stream: {d['stream']}")
lines.append(f" Events: {d['messages']:,} ({format_bytes(d['bytes'])})")
lines.append(f" Last: {format_age(d['last_ts'])}")
lines.append(f" Types: 📥 {d['msg_in']} in | 📤 {d['msg_out']} out | 🔧 {d['tool_calls']} tools | 🔄 {d['lifecycle']} lifecycle")
lines.append("")
lines.append(f"\033[2m{'' * 50}\033[0m")
lines.append(f"\033[1m📊 TOTAL:\033[0m {total_events:,} events ({format_bytes(total_bytes)}) across {active} active agents\n")
lines.append("\033[2m🔒 Each agent can only access their own stream\033[0m\n")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Neural Monitor — Multi-Agent Dashboard")
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
data = get_dashboard()
if args.json:
print(json.dumps(data, indent=2))
else:
print(format_dashboard(data))
if __name__ == "__main__":
main()