166 lines
5.1 KiB
Python
166 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Neural Monitor — Multi-Agent Event Stream Dashboard.
|
|
|
|
Shows per-agent statistics from isolated NATS streams.
|
|
|
|
Usage:
|
|
cortex monitor [--json]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from cortex.config import cortex_home
|
|
|
|
# Default agents — override via DARKPLEX_AGENTS_CONFIG or agents.json in config dir
|
|
_DEFAULT_AGENTS = {
|
|
"main": {"name": "Main", "emoji": "🤖", "stream": "openclaw-events"},
|
|
}
|
|
|
|
def _load_agents():
|
|
"""Load agent config from file or env, fall back to defaults."""
|
|
config_path = os.environ.get("DARKPLEX_AGENTS_CONFIG", "")
|
|
if not config_path:
|
|
config_dir = os.environ.get("DARKPLEX_CONFIG_DIR", os.path.expanduser("~/.darkplex"))
|
|
config_path = os.path.join(config_dir, "agents.json")
|
|
if os.path.exists(config_path):
|
|
try:
|
|
with open(config_path) as f:
|
|
import json
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return _DEFAULT_AGENTS
|
|
|
|
AGENTS = _load_agents()
|
|
|
|
NATS_BIN = str(Path.home() / "bin" / "nats")
|
|
|
|
|
|
def _nats_json(args: list) -> dict | None:
|
|
try:
|
|
r = subprocess.run([NATS_BIN] + args + ["--json"],
|
|
capture_output=True, text=True, timeout=10)
|
|
if r.returncode == 0:
|
|
return json.loads(r.stdout)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def get_stream_info(stream: str) -> dict:
|
|
info = _nats_json(["stream", "info", stream])
|
|
if not info:
|
|
return {"messages": 0, "bytes": 0, "last_ts": None, "subjects": 0}
|
|
state = info.get("state", {})
|
|
return {
|
|
"messages": state.get("messages", 0),
|
|
"bytes": state.get("bytes", 0),
|
|
"last_ts": state.get("last_ts"),
|
|
"subjects": state.get("num_subjects", 0),
|
|
}
|
|
|
|
|
|
def get_stream_subjects(stream: str) -> dict:
|
|
return _nats_json(["stream", "subjects", stream]) or {}
|
|
|
|
|
|
def format_bytes(b: int) -> str:
|
|
for unit in ("B", "KB", "MB", "GB"):
|
|
if b < 1024:
|
|
return f"{b:.1f} {unit}"
|
|
b /= 1024
|
|
return f"{b:.1f} TB"
|
|
|
|
|
|
def format_age(ts_str: str | None) -> str:
|
|
if not ts_str or ts_str == "0001-01-01T00:00:00Z":
|
|
return "never"
|
|
try:
|
|
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
|
|
now = datetime.now(ts.tzinfo)
|
|
secs = (now - ts).total_seconds()
|
|
if secs < 0 or secs > 365 * 24 * 3600 * 100:
|
|
return "never"
|
|
if secs < 60:
|
|
return f"{int(secs)}s ago"
|
|
if secs < 3600:
|
|
return f"{int(secs / 60)}m ago"
|
|
if secs < 86400:
|
|
return f"{int(secs / 3600)}h ago"
|
|
return f"{int(secs / 86400)}d ago"
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
|
|
def get_dashboard() -> list[dict]:
|
|
results = []
|
|
for agent_id, cfg in AGENTS.items():
|
|
info = get_stream_info(cfg["stream"])
|
|
subjects = get_stream_subjects(cfg["stream"])
|
|
|
|
msg_in = sum(v for k, v in subjects.items() if "message_in" in k)
|
|
msg_out = sum(v for k, v in subjects.items() if "message_out" in k)
|
|
tool_calls = sum(v for k, v in subjects.items() if "tool_call" in k)
|
|
lifecycle = sum(v for k, v in subjects.items() if "lifecycle" in k)
|
|
|
|
results.append({
|
|
"agent_id": agent_id,
|
|
"name": cfg["name"],
|
|
"emoji": cfg["emoji"],
|
|
"stream": cfg["stream"],
|
|
"messages": info["messages"],
|
|
"bytes": info["bytes"],
|
|
"last_ts": info["last_ts"],
|
|
"msg_in": msg_in,
|
|
"msg_out": msg_out,
|
|
"tool_calls": tool_calls,
|
|
"lifecycle": lifecycle,
|
|
})
|
|
return results
|
|
|
|
|
|
def format_dashboard(data: list[dict]) -> str:
|
|
lines = ["\n\033[1m🧠 NEURAL MONITOR - Multi-Agent Event Streams\033[0m",
|
|
f"\033[2m{'─' * 50}\033[0m\n"]
|
|
|
|
total_events = total_bytes = active = 0
|
|
for d in data:
|
|
total_events += d["messages"]
|
|
total_bytes += d["bytes"]
|
|
if d["messages"] > 0:
|
|
active += 1
|
|
|
|
lines.append(f"\033[1m{d['emoji']} {d['name']}\033[0m ({d['agent_id']})")
|
|
lines.append(f" Stream: {d['stream']}")
|
|
lines.append(f" Events: {d['messages']:,} ({format_bytes(d['bytes'])})")
|
|
lines.append(f" Last: {format_age(d['last_ts'])}")
|
|
lines.append(f" Types: 📥 {d['msg_in']} in | 📤 {d['msg_out']} out | 🔧 {d['tool_calls']} tools | 🔄 {d['lifecycle']} lifecycle")
|
|
lines.append("")
|
|
|
|
lines.append(f"\033[2m{'─' * 50}\033[0m")
|
|
lines.append(f"\033[1m📊 TOTAL:\033[0m {total_events:,} events ({format_bytes(total_bytes)}) across {active} active agents\n")
|
|
lines.append("\033[2m🔒 Each agent can only access their own stream\033[0m\n")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Neural Monitor — Multi-Agent Dashboard")
|
|
parser.add_argument("--json", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
data = get_dashboard()
|
|
|
|
if args.json:
|
|
print(json.dumps(data, indent=2))
|
|
else:
|
|
print(format_dashboard(data))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|