From 47f9703e3bff36932dbc940ea598ea40b96b5ef5 Mon Sep 17 00:00:00 2001 From: Claudia Date: Mon, 9 Feb 2026 16:20:22 +0100 Subject: [PATCH] feat: port needs, alert, summarize, anomaly, predict, monitor modules --- cortex/alert.py | 197 +++++++++++++++++++++++ cortex/anomaly.py | 195 +++++++++++++++++++++++ cortex/cli.py | 30 ++++ cortex/monitor.py | 151 ++++++++++++++++++ cortex/needs.py | 326 ++++++++++++++++++++++++++++++++++++++ cortex/predict.py | 229 ++++++++++++++++++++++++++ cortex/summarize.py | 208 ++++++++++++++++++++++++ tests/test_new_modules.py | 243 ++++++++++++++++++++++++++++ 8 files changed, 1579 insertions(+) create mode 100644 cortex/alert.py create mode 100644 cortex/anomaly.py create mode 100644 cortex/monitor.py create mode 100644 cortex/needs.py create mode 100644 cortex/predict.py create mode 100644 cortex/summarize.py diff --git a/cortex/alert.py b/cortex/alert.py new file mode 100644 index 0000000..b0104a2 --- /dev/null +++ b/cortex/alert.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +"""Alert Aggregator β€” Collects alerts from all sources into a dashboard. + +Sources: Sentinel, Docker, NATS, Disk, Brain Health, Email, Calendar. + +Usage: + cortex alert [--json] [--quiet] +""" + +import argparse +import json +import subprocess +import sys +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional + +from cortex.config import cortex_home, memory_dir + + +class Alert: + def __init__(self, source: str, level: str, message: str, timestamp: datetime = None): + self.source = source + self.level = level # critical, warning, info + self.message = message + self.timestamp = timestamp or datetime.now() + + def __repr__(self): + icon = {"critical": "πŸ”΄", "warning": "🟑", "info": "πŸ”΅"}.get(self.level, "βšͺ") + return f"{icon} [{self.source}] {self.message}" + + def to_dict(self): + return { + "source": self.source, + "level": self.level, + "message": self.message, + "timestamp": self.timestamp.isoformat(), + } + + +def _run(cmd, timeout=10): + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + return r.returncode, r.stdout, r.stderr + except Exception: + return -1, "", "" + + +def check_nats() -> list[Alert]: + alerts = [] + nats_bin = str(Path.home() / "bin" / "nats") + + rc, out, _ = _run([nats_bin, "server", "check", "connection"]) + if rc != 0: + alerts.append(Alert("NATS", "critical", "NATS Server unreachable")) + return alerts + + rc, out, _ = _run([nats_bin, "stream", "info", "openclaw-events", "-j"]) + if rc != 0: + alerts.append(Alert("NATS", "warning", "openclaw-events stream unavailable")) + return alerts + + info = json.loads(out) + messages = info.get("state", {}).get("messages", 0) + bytes_used = info.get("state", {}).get("bytes", 0) + + if messages == 0: + alerts.append(Alert("NATS", "warning", "Event Store is empty")) + elif bytes_used > 500 * 1024 * 1024: + alerts.append(Alert("NATS", "info", f"Event Store large: {bytes_used // (1024*1024)}MB, {messages:,} events")) + + return alerts + + +def check_docker() -> list[Alert]: + alerts = [] + rc, out, _ = _run( + ["ssh", "deploy@192.168.0.137", "docker ps --format '{{.Names}}|{{.Status}}'"], + timeout=15, + ) + if rc != 0: + alerts.append(Alert("Docker", "warning", "Cannot reach dock5")) + return alerts + + for line in out.strip().split("\n"): + if "|" not in line: + continue + name, status = line.split("|", 1) + if "unhealthy" in status.lower(): + alerts.append(Alert("Docker", "critical", f"{name} is unhealthy")) + elif "exited" in status.lower() or "dead" in status.lower(): + alerts.append(Alert("Docker", "critical", f"{name} is stopped")) + elif "restarting" in status.lower(): + alerts.append(Alert("Docker", "warning", f"{name} is restarting")) + + return alerts + + +def check_disk() -> list[Alert]: + alerts = [] + rc, out, _ = _run(["df", "-h", "/"]) + if rc == 0: + lines = out.strip().split("\n") + if len(lines) >= 2: + parts = lines[1].split() + if len(parts) >= 5: + usage = int(parts[4].rstrip("%")) + if usage >= 90: + alerts.append(Alert("Disk", "critical", f"localhost: {usage}% full")) + elif usage >= 80: + alerts.append(Alert("Disk", "warning", f"localhost: {usage}% full")) + return alerts + + +def check_services() -> list[Alert]: + alerts = [] + rc, out, _ = _run(["systemctl", "--user", "is-active", "event-consumer"]) + if out.strip() != "active": + alerts.append(Alert("Services", "critical", "Event Consumer inactive")) + + # Check crash loops + for svc in ["event-consumer", "claudia-monitor"]: + rc, out, _ = _run(["systemctl", "--user", "show", svc, "--property=NRestarts"]) + if "NRestarts=" in out: + restarts = int(out.strip().split("=")[1]) + if restarts > 100: + alerts.append(Alert("Services", "critical", f"{svc}: {restarts} restarts β€” crash loop")) + elif restarts > 10: + alerts.append(Alert("Services", "warning", f"{svc}: {restarts} restarts")) + + return alerts + + +def aggregate_all(quiet=False) -> list[Alert]: + all_alerts = [] + + checks = [ + ("NATS", check_nats), + ("Docker", check_docker), + ("Disk", check_disk), + ("Services", check_services), + ] + + for name, fn in checks: + if not quiet: + print(f"πŸ” Checking {name}...", file=sys.stderr) + try: + all_alerts.extend(fn()) + except Exception as e: + all_alerts.append(Alert(name, "warning", f"Check failed: {e}")) + + priority = {"critical": 0, "warning": 1, "info": 2} + all_alerts.sort(key=lambda a: priority.get(a.level, 3)) + return all_alerts + + +def format_dashboard(alerts: list[Alert]) -> str: + now = datetime.now().strftime("%Y-%m-%d %H:%M") + lines = [f"# 🚨 Alert Dashboard", f"*{now}*\n"] + + if not alerts: + lines.append("βœ… **All clear!** No alerts.") + return "\n".join(lines) + + for level, label, icon in [("critical", "Critical", "πŸ”΄"), ("warning", "Warnings", "🟑"), ("info", "Info", "πŸ”΅")]: + items = [a for a in alerts if a.level == level] + if items: + lines.append(f"## {icon} {label}") + for a in items: + lines.append(f"- **[{a.source}]** {a.message}") + lines.append("") + + c = sum(1 for a in alerts if a.level == "critical") + w = sum(1 for a in alerts if a.level == "warning") + i = sum(1 for a in alerts if a.level == "info") + lines.append(f"---\n*{c} critical | {w} warnings | {i} info*") + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Alert Aggregator") + parser.add_argument("--json", action="store_true") + parser.add_argument("--quiet", "-q", action="store_true") + args = parser.parse_args() + + alerts = aggregate_all(quiet=args.quiet) + if args.quiet: + alerts = [a for a in alerts if a.level in ("critical", "warning")] + + if args.json: + print(json.dumps([a.to_dict() for a in alerts], indent=2)) + else: + print(format_dashboard(alerts)) + + +if __name__ == "__main__": + main() diff --git a/cortex/anomaly.py b/cortex/anomaly.py new file mode 100644 index 0000000..1c8ea9f --- /dev/null +++ b/cortex/anomaly.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +"""Anomaly Detector β€” Real-time pattern detection and alerting. + +Detects error spikes, tool failures, repeated questions, security patterns. + +Usage: + cortex anomaly [--hours N] [--json] [--alert] +""" + +import argparse +import base64 +import json +import re +import subprocess +import sys +from datetime import datetime +from pathlib import Path + +from cortex.config import cortex_home, memory_dir + +THRESHOLDS = { + "errorRate": 0.1, + "toolFailureRate": 0.2, + "repeatedQuestions": 3, + "securityKeywords": ["password", "credential", "token", "api key", "secret"], +} + + +def state_file(): + return memory_dir() / "anomaly-state.json" + + +def _load_state(): + sf = state_file() + if sf.exists(): + try: + return json.loads(sf.read_text()) + except Exception: + pass + return {"lastCheck": 0, "alertedAnomalies": []} + + +def _save_state(state): + sf = state_file() + sf.parent.mkdir(parents=True, exist_ok=True) + sf.write_text(json.dumps(state, indent=2)) + + +def fetch_events(hours: int = 1) -> list: + nats = str(Path.home() / "bin" / "nats") + cutoff_ms = int((datetime.now().timestamp() - hours * 3600) * 1000) + events = [] + + try: + r = subprocess.run([nats, "stream", "info", "openclaw-events", "--json"], + capture_output=True, text=True, timeout=10) + if r.returncode != 0: + return events + info = json.loads(r.stdout) + end_seq = info["state"]["last_seq"] + start_seq = max(info["state"]["first_seq"], end_seq - 2000) + + for seq in range(start_seq, end_seq + 1): + try: + r = subprocess.run( + [nats, "stream", "get", "openclaw-events", str(seq), "--json"], + capture_output=True, text=True, timeout=2, + ) + if r.returncode != 0: + continue + msg = json.loads(r.stdout) + data = json.loads(base64.b64decode(msg["data"]).decode("utf-8")) + ts = data.get("timestamp") or data.get("timestampMs", 0) + if isinstance(ts, (int, float)) and ts > 1e12: + pass # already ms + elif isinstance(ts, (int, float)): + ts = int(ts * 1000) + + if ts > cutoff_ms: + events.append({ + "time": ts, + "type": data.get("type", "unknown"), + "text": (data.get("payload", {}).get("data", {}).get("text", "") or ""), + "tool": data.get("payload", {}).get("data", {}).get("name", ""), + "isError": data.get("payload", {}).get("data", {}).get("isError", False), + "agent": data.get("agent", "unknown"), + }) + except Exception: + continue + except Exception: + pass + + return events + + +def detect_anomalies(events: list) -> list: + anomalies = [] + if len(events) < 10: + return anomalies + + # Error rate + error_count = sum(1 for e in events if "error" in e["type"] or e["isError"] or "error" in e["text"].lower()) + error_rate = error_count / len(events) + if error_rate > THRESHOLDS["errorRate"]: + anomalies.append({ + "type": "error_spike", + "severity": "critical" if error_rate > 0.3 else "warning", + "message": f"High error rate: {error_rate:.1%} ({error_count}/{len(events)} events)", + }) + + # Tool failures + tool_events = [e for e in events if e["tool"]] + tool_errors = [e for e in tool_events if e["isError"] or "error" in e["text"].lower()] + if len(tool_events) > 5: + fail_rate = len(tool_errors) / len(tool_events) + if fail_rate > THRESHOLDS["toolFailureRate"]: + anomalies.append({ + "type": "tool_failures", + "severity": "warning", + "message": f"High tool failure rate: {fail_rate:.1%}", + }) + + # Repeated questions + user_msgs = [e["text"].lower().strip() for e in events if "message" in e["type"] and "in" in e["type"]] + counts = {} + for msg in user_msgs: + if len(msg) > 10: + key = re.sub(r"[?!.,]", "", msg)[:50] + counts[key] = counts.get(key, 0) + 1 + repeated = [(k, c) for k, c in counts.items() if c >= THRESHOLDS["repeatedQuestions"]] + if repeated: + anomalies.append({ + "type": "repeated_questions", + "severity": "warning", + "message": f"User repeated questions {len(repeated)} times β€” possible frustration", + }) + + # Security exposure + for e in events: + text = e["text"] + if re.search(r"(?:password|token|key|secret)[=:]\s*[^\s]{8,}", text, re.IGNORECASE): + anomalies.append({ + "type": "security_exposure", + "severity": "critical", + "message": "Potential credential exposure detected", + }) + break + + return anomalies + + +def format_report(anomalies: list) -> str: + if not anomalies: + return "βœ… No anomalies detected β€” all patterns normal" + + lines = [f"⚠️ Found {len(anomalies)} anomalies:\n"] + for a in anomalies: + icon = {"critical": "πŸ”΄", "warning": "🟑"}.get(a["severity"], "πŸ”΅") + lines.append(f"{icon} [{a['severity'].upper()}] {a['type']}") + lines.append(f" {a['message']}\n") + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Anomaly Detector") + parser.add_argument("--hours", type=int, default=1) + parser.add_argument("--json", action="store_true") + parser.add_argument("--alert", action="store_true") + args = parser.parse_args() + + print("🚨 Anomaly Detector\n") + + state = _load_state() + print(f"πŸ“₯ Analyzing last {args.hours}h of events...") + events = fetch_events(args.hours) + print(f" Found {len(events)} events\n") + + if len(events) < 10: + print("βœ… Not enough events to analyze") + return + + anomalies = detect_anomalies(events) + + if args.json: + print(json.dumps(anomalies, indent=2)) + else: + print(format_report(anomalies)) + + state["lastCheck"] = int(datetime.now().timestamp() * 1000) + state["lastAnomalies"] = anomalies + _save_state(state) + + +if __name__ == "__main__": + main() diff --git a/cortex/cli.py b/cortex/cli.py index 575354d..f7cd196 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -16,6 +16,12 @@ Usage: cortex context [--events 2000] [--output file] cortex track scan|list|done|check cortex sentinel scan|matches|report|stats + cortex needs [--json] [--quiet] + cortex alert [--json] [--quiet] + cortex summarize [--date YYYY-MM-DD] [--dry-run] + cortex anomaly [--hours N] [--json] + cortex predict [--learn] [--patterns] + cortex monitor [--json] cortex version """ @@ -91,6 +97,30 @@ def main(): from cortex.sentinel import main as sentinel_main sentinel_main() + elif cmd == "needs": + from cortex.needs import main as needs_main + needs_main() + + elif cmd == "alert": + from cortex.alert import main as alert_main + alert_main() + + elif cmd == "summarize": + from cortex.summarize import main as summarize_main + summarize_main() + + elif cmd == "anomaly": + from cortex.anomaly import main as anomaly_main + anomaly_main() + + elif cmd == "predict": + from cortex.predict import main as predict_main + predict_main() + + elif cmd == "monitor": + from cortex.monitor import main as monitor_main + monitor_main() + elif cmd in ("-h", "--help", "help"): print(__doc__.strip()) diff --git a/cortex/monitor.py b/cortex/monitor.py new file mode 100644 index 0000000..4a0d1ff --- /dev/null +++ b/cortex/monitor.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +"""Neural Monitor β€” Multi-Agent Event Stream Dashboard. + +Shows per-agent statistics from isolated NATS streams. + +Usage: + cortex monitor [--json] +""" + +import argparse +import json +import subprocess +import sys +from datetime import datetime +from pathlib import Path + +from cortex.config import cortex_home + +AGENTS = { + "main": {"name": "Claudia", "emoji": "πŸ›‘οΈ", "stream": "openclaw-events"}, + "mondo-assistant": {"name": "Mona", "emoji": "πŸŒ™", "stream": "events-mondo-assistant"}, + "vera": {"name": "Vera", "emoji": "πŸ”’", "stream": "events-vera"}, + "stella": {"name": "Stella", "emoji": "πŸ’°", "stream": "events-stella"}, + "viola": {"name": "Viola", "emoji": "βš™οΈ", "stream": "events-viola"}, +} + +NATS_BIN = str(Path.home() / "bin" / "nats") + + +def _nats_json(args: list) -> dict | None: + try: + r = subprocess.run([NATS_BIN] + args + ["--json"], + capture_output=True, text=True, timeout=10) + if r.returncode == 0: + return json.loads(r.stdout) + except Exception: + pass + return None + + +def get_stream_info(stream: str) -> dict: + info = _nats_json(["stream", "info", stream]) + if not info: + return {"messages": 0, "bytes": 0, "last_ts": None, "subjects": 0} + state = info.get("state", {}) + return { + "messages": state.get("messages", 0), + "bytes": state.get("bytes", 0), + "last_ts": state.get("last_ts"), + "subjects": state.get("num_subjects", 0), + } + + +def get_stream_subjects(stream: str) -> dict: + return _nats_json(["stream", "subjects", stream]) or {} + + +def format_bytes(b: int) -> str: + for unit in ("B", "KB", "MB", "GB"): + if b < 1024: + return f"{b:.1f} {unit}" + b /= 1024 + return f"{b:.1f} TB" + + +def format_age(ts_str: str | None) -> str: + if not ts_str or ts_str == "0001-01-01T00:00:00Z": + return "never" + try: + ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00")) + now = datetime.now(ts.tzinfo) + secs = (now - ts).total_seconds() + if secs < 0 or secs > 365 * 24 * 3600 * 100: + return "never" + if secs < 60: + return f"{int(secs)}s ago" + if secs < 3600: + return f"{int(secs / 60)}m ago" + if secs < 86400: + return f"{int(secs / 3600)}h ago" + return f"{int(secs / 86400)}d ago" + except Exception: + return "unknown" + + +def get_dashboard() -> list[dict]: + results = [] + for agent_id, cfg in AGENTS.items(): + info = get_stream_info(cfg["stream"]) + subjects = get_stream_subjects(cfg["stream"]) + + msg_in = sum(v for k, v in subjects.items() if "message_in" in k) + msg_out = sum(v for k, v in subjects.items() if "message_out" in k) + tool_calls = sum(v for k, v in subjects.items() if "tool_call" in k) + lifecycle = sum(v for k, v in subjects.items() if "lifecycle" in k) + + results.append({ + "agent_id": agent_id, + "name": cfg["name"], + "emoji": cfg["emoji"], + "stream": cfg["stream"], + "messages": info["messages"], + "bytes": info["bytes"], + "last_ts": info["last_ts"], + "msg_in": msg_in, + "msg_out": msg_out, + "tool_calls": tool_calls, + "lifecycle": lifecycle, + }) + return results + + +def format_dashboard(data: list[dict]) -> str: + lines = ["\n\033[1m🧠 NEURAL MONITOR - Multi-Agent Event Streams\033[0m", + f"\033[2m{'─' * 50}\033[0m\n"] + + total_events = total_bytes = active = 0 + for d in data: + total_events += d["messages"] + total_bytes += d["bytes"] + if d["messages"] > 0: + active += 1 + + lines.append(f"\033[1m{d['emoji']} {d['name']}\033[0m ({d['agent_id']})") + lines.append(f" Stream: {d['stream']}") + lines.append(f" Events: {d['messages']:,} ({format_bytes(d['bytes'])})") + lines.append(f" Last: {format_age(d['last_ts'])}") + lines.append(f" Types: πŸ“₯ {d['msg_in']} in | πŸ“€ {d['msg_out']} out | πŸ”§ {d['tool_calls']} tools | πŸ”„ {d['lifecycle']} lifecycle") + lines.append("") + + lines.append(f"\033[2m{'─' * 50}\033[0m") + lines.append(f"\033[1mπŸ“Š TOTAL:\033[0m {total_events:,} events ({format_bytes(total_bytes)}) across {active} active agents\n") + lines.append("\033[2mπŸ”’ Each agent can only access their own stream\033[0m\n") + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Neural Monitor β€” Multi-Agent Dashboard") + parser.add_argument("--json", action="store_true") + args = parser.parse_args() + + data = get_dashboard() + + if args.json: + print(json.dumps(data, indent=2)) + else: + print(format_dashboard(data)) + + +if __name__ == "__main__": + main() diff --git a/cortex/needs.py b/cortex/needs.py new file mode 100644 index 0000000..6e1ef36 --- /dev/null +++ b/cortex/needs.py @@ -0,0 +1,326 @@ +#!/usr/bin/env python3 +"""Needs System β€” Self-Monitoring & Self-Healing Loop. + +Monitors functional needs: context, health, energy, connection, growth. +Each need has sensors, thresholds, self-heal actions, and escalation. + +Usage: + cortex needs [--json] [--quiet] +""" + +import argparse +import json +import subprocess +import sys +from dataclasses import asdict, dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Optional + +from cortex.config import cortex_home, memory_dir + + +@dataclass +class Need: + name: str + level: float # 0.0 (critical) to 1.0 (fully satisfied) + status: str # "satisfied", "low", "critical" + details: str + healed: list = field(default_factory=list) + escalate: list = field(default_factory=list) + + +@dataclass +class Wellbeing: + timestamp: str + overall: float + status: str + needs: dict + healed: list + escalations: list + history_trend: str + + +def _run_cmd(cmd, timeout=10): + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + return r.returncode, r.stdout, r.stderr + except subprocess.TimeoutExpired: + return -1, "", "timeout" + except Exception as e: + return -1, "", str(e) + + +def _try_heal(action_name, cmd, timeout=30): + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + if r.returncode == 0: + return True, f"βœ… {action_name}: success" + return False, f"❌ {action_name}: failed ({r.stderr[:80]})" + except Exception as e: + return False, f"❌ {action_name}: {e}" + + +def _classify(score): + return "satisfied" if score > 0.7 else "low" if score > 0.3 else "critical" + + +def wellbeing_file(): + return memory_dir() / "wellbeing.json" + + +def sense_context(): + score, details, healed, escalate = 1.0, [], [], [] + mem = memory_dir() + + working = mem / "WORKING.md" + if working.exists(): + age_h = (datetime.now().timestamp() - working.stat().st_mtime) / 3600 + if age_h > 8: + score -= 0.3 + details.append(f"WORKING.md stale ({age_h:.0f}h)") + elif age_h > 4: + score -= 0.1 + details.append(f"WORKING.md somewhat old ({age_h:.1f}h)") + else: + score -= 0.4 + details.append("WORKING.md missing") + + boot_ctx = mem / "BOOT_CONTEXT.md" + if boot_ctx.exists(): + age_h = (datetime.now().timestamp() - boot_ctx.stat().st_mtime) / 3600 + if age_h > 4: + score -= 0.15 + details.append(f"BOOT_CONTEXT.md stale ({age_h:.0f}h)") + else: + score -= 0.2 + details.append("BOOT_CONTEXT.md missing") + + home = cortex_home() + memory_md = home.parent / "clawd" / "MEMORY.md" if "cortex" not in str(home) else home / "MEMORY.md" + # Try standard location + for p in [Path.home() / "clawd" / "MEMORY.md", home / "MEMORY.md"]: + if p.exists(): + if p.stat().st_size < 100: + score -= 0.3 + details.append("MEMORY.md nearly empty") + break + else: + score -= 0.3 + details.append("MEMORY.md not found") + + if not details: + details.append("Context complete and fresh") + + return Need("context", max(0.0, score), _classify(max(0.0, score)), + "; ".join(details), healed, escalate) + + +def sense_health(): + score, details, healed, escalate = 1.0, [], [], [] + + rc, out, _ = _run_cmd(["systemctl", "--user", "is-active", "event-consumer"]) + if out.strip() != "active": + score -= 0.3 + details.append("Event Consumer inactive") + + nats_bin = str(Path.home() / "bin" / "nats") + rc, out, _ = _run_cmd([nats_bin, "server", "check", "connection"]) + if rc != 0: + score -= 0.4 + details.append("NATS unreachable") + escalate.append("NATS Server down") + + rc, out, _ = _run_cmd(["df", "--output=pcent", "/"]) + if rc == 0: + lines = out.strip().split('\n') + if len(lines) >= 2: + usage = int(lines[1].strip().rstrip('%')) + if usage > 90: + score -= 0.3 + details.append(f"Disk {usage}% full") + elif usage > 80: + score -= 0.1 + details.append(f"Disk {usage}% full") + + if not details: + details.append("All systems healthy") + + return Need("health", max(0.0, score), _classify(max(0.0, score)), + "; ".join(details), healed, escalate) + + +def sense_energy(): + score, details = 1.0, [] + + total_ctx_bytes = 0 + clawd = Path.home() / "clawd" + for f in ["SOUL.md", "AGENTS.md", "TOOLS.md", "MEMORY.md", "USER.md"]: + p = clawd / f + if p.exists(): + total_ctx_bytes += p.stat().st_size + + total_ctx_kb = total_ctx_bytes / 1024 + if total_ctx_kb > 30: + score -= 0.2 + details.append(f"Workspace files {total_ctx_kb:.0f}KB β€” context pressure") + else: + details.append(f"Workspace files {total_ctx_kb:.0f}KB β€” efficient") + + if not details: + details.append("Energy budget good") + + return Need("energy", max(0.0, score), _classify(max(0.0, score)), + "; ".join(details), [], []) + + +def sense_connection(): + score, details = 1.0, [] + hour = datetime.now().hour + if 23 <= hour or hour < 8: + details.append("Night mode β€” no interaction expected") + else: + details.append("Connection status normal") + + return Need("connection", max(0.0, score), _classify(max(0.0, score)), + "; ".join(details), [], []) + + +def sense_growth(): + score, details = 1.0, [] + + rag_db = Path.home() / "clawd" / ".rag-db" / "chroma.sqlite3" + if rag_db.exists(): + age_days = (datetime.now().timestamp() - rag_db.stat().st_mtime) / 86400 + if age_days > 14: + score -= 0.3 + details.append(f"RAG Index {age_days:.0f} days old") + elif age_days > 7: + score -= 0.1 + details.append(f"RAG Index {age_days:.0f} days old") + else: + details.append(f"RAG Index fresh ({age_days:.1f} days)") + else: + score -= 0.3 + details.append("RAG DB missing") + + if not details: + details.append("Growth normal") + + return Need("growth", max(0.0, score), _classify(max(0.0, score)), + "; ".join(details), [], []) + + +def assess_wellbeing() -> Wellbeing: + needs = {} + all_healed, all_escalations = [], [] + + for sensor in [sense_context, sense_health, sense_energy, sense_connection, sense_growth]: + try: + need = sensor() + needs[need.name] = need + all_healed.extend(need.healed) + all_escalations.extend(need.escalate) + except Exception as e: + name = sensor.__name__.replace("sense_", "") + needs[name] = Need(name, 0.5, "unknown", f"Sensor error: {e}") + + weights = {"context": 0.3, "health": 0.3, "energy": 0.15, "connection": 0.1, "growth": 0.15} + overall = sum(needs[n].level * weights.get(n, 0.2) for n in needs) / sum(weights.get(n, 0.2) for n in needs) + + if overall > 0.8: + status = "thriving" + elif overall > 0.6: + status = "okay" + elif overall > 0.3: + status = "struggling" + else: + status = "critical" + + trend = _compute_trend(overall) + + return Wellbeing( + timestamp=datetime.now().isoformat(), + overall=round(overall, 2), + status=status, + needs={n: asdict(need) for n, need in needs.items()}, + healed=[h for h in all_healed if h], + escalations=all_escalations, + history_trend=trend, + ) + + +def _compute_trend(current): + try: + wf = wellbeing_file() + if wf.exists(): + history = json.loads(wf.read_text()) + past = [h.get("overall", 0.5) for h in history.get("history", [])] + if past: + avg = sum(past[-5:]) / len(past[-5:]) + if current > avg + 0.1: + return "improving" + elif current < avg - 0.1: + return "declining" + return "stable" + except Exception: + return "unknown" + + +def save_wellbeing(wb: Wellbeing): + data = asdict(wb) + wf = wellbeing_file() + wf.parent.mkdir(parents=True, exist_ok=True) + + history = [] + if wf.exists(): + try: + history = json.loads(wf.read_text()).get("history", []) + except Exception: + pass + + history.append({"timestamp": wb.timestamp, "overall": wb.overall, "status": wb.status}) + data["history"] = history[-48:] + wf.write_text(json.dumps(data, indent=2, ensure_ascii=False)) + + +def format_status(wb: Wellbeing) -> str: + emoji = {"thriving": "🌟", "okay": "😊", "struggling": "😟", "critical": "πŸ†˜"}.get(wb.status, "❓") + trend_emoji = {"improving": "πŸ“ˆ", "stable": "➑️", "declining": "πŸ“‰"}.get(wb.history_trend, "❓") + + lines = [f"{emoji} Wellbeing: {wb.overall:.0%} ({wb.status}) {trend_emoji} {wb.history_trend}", ""] + + need_emoji = {"context": "🧠", "health": "πŸ’Š", "energy": "⚑", "connection": "πŸ’¬", "growth": "🌱"} + for name, data in wb.needs.items(): + ne = need_emoji.get(name, "β€’") + bar = "β–ˆ" * int(data["level"] * 10) + "β–‘" * (10 - int(data["level"] * 10)) + lines.append(f" {ne} {name:12s} [{bar}] {data['level']:.0%} β€” {data['details']}") + + if wb.healed: + lines.extend(["", "πŸ”§ Self-Healed:"] + [f" {h}" for h in wb.healed]) + if wb.escalations: + lines.extend(["", "⚠️ Need attention:"] + [f" β†’ {e}" for e in wb.escalations]) + + return "\n".join(lines) + + +def main(): + parser = argparse.ArgumentParser(description="Needs System β€” Self-Monitoring") + parser.add_argument("--json", action="store_true") + parser.add_argument("--quiet", "-q", action="store_true") + args = parser.parse_args() + + wb = assess_wellbeing() + save_wellbeing(wb) + + if args.json: + print(json.dumps(asdict(wb), indent=2, ensure_ascii=False)) + elif args.quiet: + if wb.status != "thriving" or wb.escalations: + print(format_status(wb)) + else: + print(format_status(wb)) + + +if __name__ == "__main__": + main() diff --git a/cortex/predict.py b/cortex/predict.py new file mode 100644 index 0000000..475304a --- /dev/null +++ b/cortex/predict.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +"""Predictive Actions β€” Pattern-based proactive suggestions. + +Analyzes behavior patterns and predicts what user might need next. + +Usage: + cortex predict [--learn] [--patterns] [--json] +""" + +import argparse +import base64 +import json +import re +import subprocess +import sys +from datetime import datetime +from pathlib import Path + +from cortex.config import cortex_home, memory_dir + + +def patterns_file(): + return memory_dir() / "behavior-patterns.json" + + +def _load_patterns(): + pf = patterns_file() + if pf.exists(): + try: + return json.loads(pf.read_text()) + except Exception: + pass + return {"timePatterns": {}, "sequences": {}, "recurring": [], "lastUpdated": None} + + +def _save_patterns(patterns): + pf = patterns_file() + pf.parent.mkdir(parents=True, exist_ok=True) + patterns["lastUpdated"] = datetime.now().isoformat() + pf.write_text(json.dumps(patterns, indent=2)) + + +def fetch_events(hours: int = 168) -> list: + """Fetch events for learning (default 1 week).""" + nats = str(Path.home() / "bin" / "nats") + cutoff_ms = int((datetime.now().timestamp() - hours * 3600) * 1000) + events = [] + + try: + r = subprocess.run([nats, "stream", "info", "openclaw-events", "--json"], + capture_output=True, text=True, timeout=10) + if r.returncode != 0: + return events + info = json.loads(r.stdout) + end_seq = info["state"]["last_seq"] + start_seq = max(info["state"]["first_seq"], end_seq - 10000) + step = max(1, (end_seq - start_seq) // 2000) + + for seq in range(start_seq, end_seq + 1, step): + try: + r = subprocess.run( + [nats, "stream", "get", "openclaw-events", str(seq), "--json"], + capture_output=True, text=True, timeout=2, + ) + if r.returncode != 0: + continue + msg = json.loads(r.stdout) + data = json.loads(base64.b64decode(msg["data"]).decode("utf-8")) + ts = data.get("timestamp") or data.get("timestampMs", 0) + if isinstance(ts, (int, float)) and ts > 1e12: + ts_s = ts / 1000 + elif isinstance(ts, (int, float)): + ts_s = ts + else: + continue + + if ts_s * 1000 > cutoff_ms: + events.append({ + "time": datetime.fromtimestamp(ts_s), + "type": data.get("type", "unknown"), + "text": (data.get("payload", {}).get("data", {}).get("text", "") or "")[:200], + "tool": data.get("payload", {}).get("data", {}).get("name", ""), + "agent": data.get("agent", "main"), + }) + except Exception: + continue + except Exception: + pass + + return sorted(events, key=lambda e: e["time"]) + + +def categorize_activity(event: dict) -> str: + text = event["text"].lower() + if any(w in text for w in ("email", "mail", "inbox")): + return "email" + if any(w in text for w in ("calendar", "meeting", "termin")): + return "calendar" + if any(w in text for w in ("git", "commit", "push")): + return "git" + if any(w in text for w in ("search", "web_search")): + return "search" + if any(w in text for w in ("mondo", "mygate", "fintech")): + return "mondo-gate" + if event["tool"] == "exec": + return "shell" + if event["tool"] in ("read", "write"): + return "files" + if "message" in event["type"]: + return "chat" + return "other" + + +def learn_patterns(events: list) -> dict: + patterns = _load_patterns() + patterns["timePatterns"] = {} + patterns["sequences"] = {} + + last_activity = None + for event in events: + hour = event["time"].hour + dow = event["time"].weekday() + activity = categorize_activity(event) + key = f"{dow}-{hour}" + + patterns["timePatterns"].setdefault(key, {}) + patterns["timePatterns"][key][activity] = patterns["timePatterns"][key].get(activity, 0) + 1 + + if last_activity and last_activity != activity: + patterns["sequences"].setdefault(last_activity, {}) + patterns["sequences"][last_activity][activity] = \ + patterns["sequences"][last_activity].get(activity, 0) + 1 + + last_activity = activity + + return patterns + + +def predict_actions(patterns: dict) -> list: + now = datetime.now() + key = f"{now.weekday()}-{now.hour}" + predictions = [] + + time_activities = patterns["timePatterns"].get(key, {}) + for activity, count in sorted(time_activities.items(), key=lambda x: -x[1])[:3]: + if count >= 3: + predictions.append({ + "type": "time-based", + "activity": activity, + "confidence": min(0.9, count / 10), + "reason": f"You often do this at this time", + }) + + return predictions + + +SUGGESTIONS = { + "email": "Check emails?", + "calendar": "Review calendar?", + "git": "Check git status?", + "search": "Need to research something?", + "mondo-gate": "Work on Mondo Gate?", + "shell": "Run system checks?", + "files": "Edit documentation or notes?", + "chat": "Check messages?", +} + + +def main(): + parser = argparse.ArgumentParser(description="Predictive Actions") + parser.add_argument("--learn", action="store_true") + parser.add_argument("--patterns", action="store_true") + parser.add_argument("--json", action="store_true") + args = parser.parse_args() + + if args.learn: + print("πŸ“š Learning patterns from last 7 days...\n") + events = fetch_events(168) + print(f" Found {len(events)} events\n") + patterns = learn_patterns(events) + _save_patterns(patterns) + print(f"βœ… Patterns learned!") + print(f" Time patterns: {len(patterns['timePatterns'])} time slots") + print(f" Sequences: {len(patterns['sequences'])} transitions") + return + + if args.patterns: + patterns = _load_patterns() + if args.json: + print(json.dumps(patterns, indent=2, default=str)) + else: + print("πŸ“Š Learned Patterns:\n") + now = datetime.now() + for h in range(8, 23): + key = f"{now.weekday()}-{h}" + acts = patterns["timePatterns"].get(key, {}) + if acts: + top = ", ".join(f"{a}({c})" for a, c in sorted(acts.items(), key=lambda x: -x[1])[:2]) + print(f" {h}:00 β†’ {top}") + return + + # Default: predict + patterns = _load_patterns() + if not patterns["lastUpdated"]: + print("⚠️ No patterns learned yet. Run with --learn first.") + return + + predictions = predict_actions(patterns) + + if args.json: + print(json.dumps(predictions, indent=2)) + return + + if not predictions: + print("πŸ€” No strong predictions for this time.") + return + + print(f"πŸ“ Now: {datetime.now().strftime('%H:%M')}\n") + print("Based on your patterns:\n") + for p in predictions: + conf = int(p["confidence"] * 100) + bar = "β–ˆ" * (conf // 10) + "β–‘" * (10 - conf // 10) + print(f" {bar} {conf}% {p['activity']}") + print(f" πŸ’‘ {SUGGESTIONS.get(p['activity'], p['activity'] + '?')}") + print(f" πŸ“ {p['reason']}\n") + + +if __name__ == "__main__": + main() diff --git a/cortex/summarize.py b/cortex/summarize.py new file mode 100644 index 0000000..b333b70 --- /dev/null +++ b/cortex/summarize.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +"""Daily Conversation Summarizer β€” reads events from NATS, summarizes via LLM. + +Usage: + cortex summarize [--date YYYY-MM-DD] [--model gemma2:27b] [--dry-run] +""" + +import argparse +import base64 +import json +import subprocess +import sys +import time +from datetime import datetime, timedelta +from pathlib import Path + +from cortex.config import cortex_home + +NATS_BIN = str(Path.home() / "bin" / "nats") +OLLAMA_URL = "http://desktop01:11434/api/generate" +STREAM = "openclaw-events" + +SUMMARY_PROMPT = """You are a personal assistant summarizing a day's conversations. +Write a concise daily summary in German. Focus on: + +1. **Entscheidungen** β€” Was wurde beschlossen? +2. **Fortschritte** β€” Was wurde gebaut/erledigt? +3. **Probleme** β€” Was ging schief, was ist offen? +4. **Personen** β€” Wer war beteiligt, neue Kontakte? +5. **NΓ€chste Schritte** β€” Was steht an? + +Keep it under 500 words. Use bullet points. Skip heartbeats and system noise. + +CONVERSATIONS FROM {date}: +{conversations} + +DAILY SUMMARY (auf Deutsch):""" + + +def warm_dir(): + return cortex_home() / "brain" / "warm" + + +def get_day_events(date_str: str) -> list: + day = datetime.strptime(date_str, "%Y-%m-%d") + day_start = day.timestamp() + day_end = (day + timedelta(days=1)).timestamp() + + r = subprocess.run([NATS_BIN, "stream", "info", STREAM, "-j"], + capture_output=True, text=True) + if r.returncode != 0: + return [] + + info = json.loads(r.stdout) + total = info["state"]["messages"] + first_seq = info["state"]["first_seq"] + last_seq = info["state"]["last_seq"] + + first_ts = datetime.fromisoformat( + info["state"]["first_ts"].replace("Z", "+00:00")).timestamp() + last_ts = datetime.fromisoformat( + info["state"]["last_ts"].replace("Z", "+00:00")).timestamp() + days_active = max((last_ts - first_ts) / 86400, 1) + events_per_day = total / days_active + + days_from_start = (day_start - first_ts) / 86400 + est_start = max(first_seq, int(first_seq + days_from_start * events_per_day - events_per_day * 0.1)) + est_end = min(last_seq, int(est_start + events_per_day * 1.2)) + + events = [] + in_range = False + + for seq in range(est_start, est_end + 1): + r = subprocess.run([NATS_BIN, "stream", "get", STREAM, str(seq), "-j"], + capture_output=True, text=True, timeout=3) + if r.returncode != 0: + continue + try: + data = json.loads(r.stdout) + if "conversation_message" not in data.get("subject", ""): + continue + + raw = data.get("data", "") + if not raw: + continue + + decoded = json.loads(base64.b64decode(raw).decode("utf-8")) + ts = decoded.get("timestamp", 0) + if isinstance(ts, (int, float)) and ts > 1e12: + ts = ts / 1000 + + if ts < day_start: + continue + if ts > day_end: + if in_range: + break + continue + + in_range = True + payload = decoded.get("payload", {}) + + text = "" + tp = payload.get("text_preview", "") + if isinstance(tp, str): + text = tp + elif isinstance(tp, list): + text = " ".join(i.get("text", "") for i in tp if isinstance(i, dict)) + if not text: + pd = payload.get("data", {}) + if isinstance(pd, dict): + text = pd.get("text", "") or pd.get("content", "") + if not text: + text = payload.get("text", "") or payload.get("content", "") + + text = text.strip() + if not text or len(text) < 20: + continue + if "HEARTBEAT" in text or text.startswith("[cron:"): + continue + + hour = datetime.fromtimestamp(ts).strftime("%H:%M") + agent = decoded.get("agent", "?") + etype = decoded.get("type", "") + direction = "β†’" if "message_out" in etype else "←" + + events.append({"time": hour, "agent": agent, "direction": direction, "text": text[:300]}) + except Exception: + continue + + return events + + +def summarize_with_llm(events: list, date_str: str, model: str = "gemma2:27b") -> str: + conv_text = "" + for ev in events: + conv_text += f"[{ev['time']}] {ev['direction']} ({ev['agent']}) {ev['text'][:200]}\n" + if len(conv_text) > 4000: + conv_text += f"\n... and {len(events)} more messages" + break + + prompt = SUMMARY_PROMPT.format(date=date_str, conversations=conv_text) + payload = { + "model": model, + "prompt": prompt, + "stream": False, + "options": {"temperature": 0.3, "num_predict": 1500}, + } + + try: + r = subprocess.run( + ["curl", "-s", "-m", "90", OLLAMA_URL, "-d", json.dumps(payload)], + capture_output=True, text=True, timeout=95, + ) + return json.loads(r.stdout).get("response", "").strip() + except Exception as e: + return f"⚠️ Summary generation failed: {e}" + + +def main(): + parser = argparse.ArgumentParser(description="Daily Conversation Summarizer") + parser.add_argument("--date", type=str, help="Date to summarize (YYYY-MM-DD), default: yesterday") + parser.add_argument("--model", type=str, default="gemma2:27b") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + date_str = args.date or (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + + print(f"🌑️ Daily Summarizer β€” {date_str}") + print("=" * 50) + print(f"πŸ“₯ Fetching events for {date_str}...") + + t0 = time.time() + events = get_day_events(date_str) + print(f" Found {len(events)} conversation events in {time.time() - t0:.1f}s") + + if not events: + print("❌ No events found for this date.") + return + + if args.dry_run: + print("\nπŸ“ Events:") + for ev in events[:30]: + print(f" [{ev['time']}] {ev['direction']} ({ev['agent']}) {ev['text'][:100]}") + if len(events) > 30: + print(f" ... +{len(events) - 30} more") + return + + print(f"\nπŸ€– Generating summary with {args.model}...") + t0 = time.time() + summary = summarize_with_llm(events, date_str, args.model) + print(f" Done in {time.time() - t0:.1f}s") + + wd = warm_dir() + wd.mkdir(parents=True, exist_ok=True) + output_file = wd / f"{date_str}.md" + + content = f"# Daily Summary: {date_str}\n" + content += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n" + content += f"Events: {len(events)} conversations\nModel: {args.model}\n\n" + content += summary + + output_file.write_text(content) + print(f"\nπŸ“ Written to {output_file}") + print(f"\n{summary[:500]}...") + + +if __name__ == "__main__": + main() diff --git a/tests/test_new_modules.py b/tests/test_new_modules.py index bd71d8a..c36caa3 100644 --- a/tests/test_new_modules.py +++ b/tests/test_new_modules.py @@ -434,3 +434,246 @@ class TestIntegration: if __name__ == '__main__': pytest.main([__file__, '-v']) + + +# --- Needs Module Tests --- + +class TestNeeds: + """Tests for cortex.needs module.""" + + def test_import(self): + from cortex import needs + assert hasattr(needs, 'assess_wellbeing') + assert hasattr(needs, 'format_status') + + def test_classify(self): + from cortex.needs import _classify + assert _classify(0.9) == "satisfied" + assert _classify(0.5) == "low" + assert _classify(0.1) == "critical" + + def test_assess_wellbeing(self, temp_cortex_home): + from cortex.needs import assess_wellbeing + wb = assess_wellbeing() + assert 0.0 <= wb.overall <= 1.0 + assert wb.status in ("thriving", "okay", "struggling", "critical") + assert "context" in wb.needs + assert "health" in wb.needs + + def test_save_and_load(self, temp_cortex_home): + from cortex.needs import assess_wellbeing, save_wellbeing, wellbeing_file + wb = assess_wellbeing() + save_wellbeing(wb) + wf = wellbeing_file() + assert wf.exists() + data = json.loads(wf.read_text()) + assert "overall" in data + assert "history" in data + + def test_format_status(self): + from cortex.needs import assess_wellbeing, format_status + wb = assess_wellbeing() + output = format_status(wb) + assert "Wellbeing" in output + + +# --- Alert Module Tests --- + +class TestAlert: + """Tests for cortex.alert module.""" + + def test_import(self): + from cortex import alert + assert hasattr(alert, 'Alert') + assert hasattr(alert, 'format_dashboard') + + def test_alert_creation(self): + from cortex.alert import Alert + a = Alert("test", "critical", "Test alert") + assert a.source == "test" + assert a.level == "critical" + d = a.to_dict() + assert d["source"] == "test" + + def test_format_dashboard_empty(self): + from cortex.alert import format_dashboard + output = format_dashboard([]) + assert "All clear" in output + + def test_format_dashboard_with_alerts(self): + from cortex.alert import Alert, format_dashboard + alerts = [Alert("test", "critical", "Something broke")] + output = format_dashboard(alerts) + assert "Critical" in output + assert "Something broke" in output + + +# --- Summarize Module Tests --- + +class TestSummarize: + """Tests for cortex.summarize module.""" + + def test_import(self): + from cortex import summarize + assert hasattr(summarize, 'get_day_events') + assert hasattr(summarize, 'warm_dir') + + def test_warm_dir(self, temp_cortex_home): + from cortex.summarize import warm_dir + wd = warm_dir() + assert "brain" in str(wd) + assert "warm" in str(wd) + + +# --- Anomaly Module Tests --- + +class TestAnomaly: + """Tests for cortex.anomaly module.""" + + def test_import(self): + from cortex import anomaly + assert hasattr(anomaly, 'detect_anomalies') + + def test_detect_no_events(self): + from cortex.anomaly import detect_anomalies + assert detect_anomalies([]) == [] + + def test_detect_few_events(self): + from cortex.anomaly import detect_anomalies + events = [{"type": "msg", "text": "hi", "tool": "", "isError": False}] * 5 + assert detect_anomalies(events) == [] + + def test_detect_error_spike(self): + from cortex.anomaly import detect_anomalies + events = [{"type": "error", "text": "error occurred", "tool": "", "isError": True}] * 15 + anomalies = detect_anomalies(events) + assert any(a["type"] == "error_spike" for a in anomalies) + + def test_format_report_clean(self): + from cortex.anomaly import format_report + assert "No anomalies" in format_report([]) + + def test_state_file(self, temp_cortex_home): + from cortex.anomaly import state_file, _load_state, _save_state + state = _load_state() + assert "lastCheck" in state + _save_state(state) + assert state_file().exists() + + +# --- Predict Module Tests --- + +class TestPredict: + """Tests for cortex.predict module.""" + + def test_import(self): + from cortex import predict + assert hasattr(predict, 'predict_actions') + assert hasattr(predict, 'learn_patterns') + + def test_categorize_activity(self): + from cortex.predict import categorize_activity + assert categorize_activity({"text": "check email inbox", "tool": "", "type": "msg"}) == "email" + assert categorize_activity({"text": "git commit", "tool": "", "type": "msg"}) == "git" + assert categorize_activity({"text": "something", "tool": "exec", "type": "tool"}) == "shell" + + def test_load_empty_patterns(self, temp_cortex_home): + from cortex.predict import _load_patterns + p = _load_patterns() + assert "timePatterns" in p + assert "sequences" in p + + def test_learn_patterns(self): + from cortex.predict import learn_patterns + from datetime import datetime + events = [ + {"time": datetime(2026, 1, 1, 9, 0), "type": "msg", "text": "check email", "tool": "", "agent": "main"}, + {"time": datetime(2026, 1, 1, 9, 5), "type": "msg", "text": "git push", "tool": "", "agent": "main"}, + ] + patterns = learn_patterns(events) + assert len(patterns["timePatterns"]) > 0 + + def test_predict_empty(self): + from cortex.predict import predict_actions + preds = predict_actions({"timePatterns": {}, "sequences": {}}) + assert preds == [] + + +# --- Monitor Module Tests --- + +class TestMonitor: + """Tests for cortex.monitor module.""" + + def test_import(self): + from cortex import monitor + assert hasattr(monitor, 'get_dashboard') + assert hasattr(monitor, 'AGENTS') + + def test_agents_config(self): + from cortex.monitor import AGENTS + assert "main" in AGENTS + assert "stream" in AGENTS["main"] + + def test_format_bytes(self): + from cortex.monitor import format_bytes + assert "KB" in format_bytes(2048) + assert "MB" in format_bytes(2 * 1024 * 1024) + assert "B" in format_bytes(100) + + def test_format_age_none(self): + from cortex.monitor import format_age + assert format_age(None) == "never" + assert format_age("0001-01-01T00:00:00Z") == "never" + + def test_format_dashboard(self): + from cortex.monitor import format_dashboard + data = [{ + "agent_id": "main", "name": "Claudia", "emoji": "πŸ›‘οΈ", + "stream": "openclaw-events", "messages": 100, "bytes": 1024, + "last_ts": None, "msg_in": 50, "msg_out": 40, "tool_calls": 10, "lifecycle": 0, + }] + output = format_dashboard(data) + assert "Claudia" in output + assert "NEURAL MONITOR" in output + + +# --- CLI Integration Tests for New Modules --- + +class TestNewModulesCLI: + """CLI integration tests for the 6 new modules.""" + + def test_cli_needs_help(self): + import subprocess + r = subprocess.run(['python3', '-m', 'cortex.needs', '--help'], + capture_output=True, text=True) + assert r.returncode == 0 + + def test_cli_alert_help(self): + import subprocess + r = subprocess.run(['python3', '-m', 'cortex.alert', '--help'], + capture_output=True, text=True) + assert r.returncode == 0 + + def test_cli_summarize_help(self): + import subprocess + r = subprocess.run(['python3', '-m', 'cortex.summarize', '--help'], + capture_output=True, text=True) + assert r.returncode == 0 + + def test_cli_anomaly_help(self): + import subprocess + r = subprocess.run(['python3', '-m', 'cortex.anomaly', '--help'], + capture_output=True, text=True) + assert r.returncode == 0 + + def test_cli_predict_help(self): + import subprocess + r = subprocess.run(['python3', '-m', 'cortex.predict', '--help'], + capture_output=True, text=True) + assert r.returncode == 0 + + def test_cli_monitor_help(self): + import subprocess + r = subprocess.run(['python3', '-m', 'cortex.monitor', '--help'], + capture_output=True, text=True) + assert r.returncode == 0