#!/usr/bin/env python3 """Alert Aggregator — Collects alerts from all sources into a dashboard. Sources: Sentinel, Docker, NATS, Disk, Brain Health, Email, Calendar. Usage: cortex alert [--json] [--quiet] """ import argparse import os import json import subprocess import sys from datetime import datetime, timedelta from pathlib import Path from typing import Optional from cortex.config import cortex_home, memory_dir class Alert: def __init__(self, source: str, level: str, message: str, timestamp: datetime = None): self.source = source self.level = level # critical, warning, info self.message = message self.timestamp = timestamp or datetime.now() def __repr__(self): icon = {"critical": "🔴", "warning": "🟡", "info": "🔵"}.get(self.level, "⚪") return f"{icon} [{self.source}] {self.message}" def to_dict(self): return { "source": self.source, "level": self.level, "message": self.message, "timestamp": self.timestamp.isoformat(), } def _run(cmd, timeout=10): try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) return r.returncode, r.stdout, r.stderr except Exception: return -1, "", "" def check_nats() -> list[Alert]: alerts = [] nats_bin = str(Path.home() / "bin" / "nats") rc, out, _ = _run([nats_bin, "server", "check", "connection"]) if rc != 0: alerts.append(Alert("NATS", "critical", "NATS Server unreachable")) return alerts rc, out, _ = _run([nats_bin, "stream", "info", "openclaw-events", "-j"]) if rc != 0: alerts.append(Alert("NATS", "warning", "openclaw-events stream unavailable")) return alerts info = json.loads(out) messages = info.get("state", {}).get("messages", 0) bytes_used = info.get("state", {}).get("bytes", 0) if messages == 0: alerts.append(Alert("NATS", "warning", "Event Store is empty")) elif bytes_used > 500 * 1024 * 1024: alerts.append(Alert("NATS", "info", f"Event Store large: {bytes_used // (1024*1024)}MB, {messages:,} events")) return alerts def check_docker() -> list[Alert]: alerts = [] rc, out, _ = _run( ["ssh", os.environ.get("DARKPLEX_DOCKER_HOST", "deploy@localhost"), "docker ps --format '{{.Names}}|{{.Status}}'"], timeout=15, ) if rc != 0: alerts.append(Alert("Docker", "warning", "Cannot reach Docker host")) return alerts for line in out.strip().split("\n"): if "|" not in line: continue name, status = line.split("|", 1) if "unhealthy" in status.lower(): alerts.append(Alert("Docker", "critical", f"{name} is unhealthy")) elif "exited" in status.lower() or "dead" in status.lower(): alerts.append(Alert("Docker", "critical", f"{name} is stopped")) elif "restarting" in status.lower(): alerts.append(Alert("Docker", "warning", f"{name} is restarting")) return alerts def check_disk() -> list[Alert]: alerts = [] rc, out, _ = _run(["df", "-h", "/"]) if rc == 0: lines = out.strip().split("\n") if len(lines) >= 2: parts = lines[1].split() if len(parts) >= 5: usage = int(parts[4].rstrip("%")) if usage >= 90: alerts.append(Alert("Disk", "critical", f"localhost: {usage}% full")) elif usage >= 80: alerts.append(Alert("Disk", "warning", f"localhost: {usage}% full")) return alerts def check_services() -> list[Alert]: alerts = [] rc, out, _ = _run(["systemctl", "--user", "is-active", "event-consumer"]) if out.strip() != "active": alerts.append(Alert("Services", "critical", "Event Consumer inactive")) # Check crash loops for svc in ["event-consumer", "claudia-monitor"]: rc, out, _ = _run(["systemctl", "--user", "show", svc, "--property=NRestarts"]) if "NRestarts=" in out: restarts = int(out.strip().split("=")[1]) if restarts > 100: alerts.append(Alert("Services", "critical", f"{svc}: {restarts} restarts — crash loop")) elif restarts > 10: alerts.append(Alert("Services", "warning", f"{svc}: {restarts} restarts")) return alerts def aggregate_all(quiet=False) -> list[Alert]: all_alerts = [] checks = [ ("NATS", check_nats), ("Docker", check_docker), ("Disk", check_disk), ("Services", check_services), ] for name, fn in checks: if not quiet: logging.getLogger(__name__).info("Checking %s...", name) try: all_alerts.extend(fn()) except Exception as e: all_alerts.append(Alert(name, "warning", f"Check failed: {e}")) priority = {"critical": 0, "warning": 1, "info": 2} all_alerts.sort(key=lambda a: priority.get(a.level, 3)) return all_alerts def format_dashboard(alerts: list[Alert]) -> str: now = datetime.now().strftime("%Y-%m-%d %H:%M") lines = [f"# 🚨 Alert Dashboard", f"*{now}*\n"] if not alerts: lines.append("✅ **All clear!** No alerts.") return "\n".join(lines) for level, label, icon in [("critical", "Critical", "🔴"), ("warning", "Warnings", "🟡"), ("info", "Info", "🔵")]: items = [a for a in alerts if a.level == level] if items: lines.append(f"## {icon} {label}") for a in items: lines.append(f"- **[{a.source}]** {a.message}") lines.append("") c = sum(1 for a in alerts if a.level == "critical") w = sum(1 for a in alerts if a.level == "warning") i = sum(1 for a in alerts if a.level == "info") lines.append(f"---\n*{c} critical | {w} warnings | {i} info*") return "\n".join(lines) def main(): parser = argparse.ArgumentParser(description="Alert Aggregator") parser.add_argument("--json", action="store_true") parser.add_argument("--quiet", "-q", action="store_true") args = parser.parse_args() alerts = aggregate_all(quiet=args.quiet) if args.quiet: alerts = [a for a in alerts if a.level in ("critical", "warning")] if args.json: print(json.dumps([a.to_dict() for a in alerts], indent=2)) else: print(format_dashboard(alerts)) if __name__ == "__main__": main()