#!/usr/bin/env python3 """Anomaly Detector — Real-time pattern detection and alerting. Detects error spikes, tool failures, repeated questions, security patterns. Usage: cortex anomaly [--hours N] [--json] [--alert] """ import argparse import base64 import json import re import subprocess import sys from datetime import datetime from pathlib import Path from cortex.config import cortex_home, memory_dir THRESHOLDS = { "errorRate": 0.1, "toolFailureRate": 0.2, "repeatedQuestions": 3, "securityKeywords": ["password", "credential", "token", "api key", "secret"], } def state_file(): return memory_dir() / "anomaly-state.json" def _load_state(): sf = state_file() if sf.exists(): try: return json.loads(sf.read_text()) except Exception: pass return {"lastCheck": 0, "alertedAnomalies": []} def _save_state(state): sf = state_file() sf.parent.mkdir(parents=True, exist_ok=True) sf.write_text(json.dumps(state, indent=2)) def fetch_events(hours: int = 1) -> list: nats = str(Path.home() / "bin" / "nats") cutoff_ms = int((datetime.now().timestamp() - hours * 3600) * 1000) events = [] try: r = subprocess.run([nats, "stream", "info", "openclaw-events", "--json"], capture_output=True, text=True, timeout=10) if r.returncode != 0: return events info = json.loads(r.stdout) end_seq = info["state"]["last_seq"] start_seq = max(info["state"]["first_seq"], end_seq - 200) for seq in range(start_seq, end_seq + 1): try: r = subprocess.run( [nats, "stream", "get", "openclaw-events", str(seq), "--json"], capture_output=True, text=True, timeout=2, ) if r.returncode != 0: continue msg = json.loads(r.stdout) data = json.loads(base64.b64decode(msg["data"]).decode("utf-8")) ts = data.get("timestamp") or data.get("timestampMs", 0) if isinstance(ts, (int, float)) and ts > 1e12: pass # already ms elif isinstance(ts, (int, float)): ts = int(ts * 1000) if ts > cutoff_ms: events.append({ "time": ts, "type": data.get("type", "unknown"), "text": (data.get("payload", {}).get("data", {}).get("text", "") or ""), "tool": data.get("payload", {}).get("data", {}).get("name", ""), "isError": data.get("payload", {}).get("data", {}).get("isError", False), "agent": data.get("agent", "unknown"), }) except Exception: continue except Exception: pass return events def detect_anomalies(events: list) -> list: anomalies = [] if len(events) < 10: return anomalies # Error rate error_count = sum(1 for e in events if "error" in e["type"] or e["isError"] or "error" in e["text"].lower()) error_rate = error_count / len(events) if error_rate > THRESHOLDS["errorRate"]: anomalies.append({ "type": "error_spike", "severity": "critical" if error_rate > 0.3 else "warning", "message": f"High error rate: {error_rate:.1%} ({error_count}/{len(events)} events)", }) # Tool failures tool_events = [e for e in events if e["tool"]] tool_errors = [e for e in tool_events if e["isError"] or "error" in e["text"].lower()] if len(tool_events) > 5: fail_rate = len(tool_errors) / len(tool_events) if fail_rate > THRESHOLDS["toolFailureRate"]: anomalies.append({ "type": "tool_failures", "severity": "warning", "message": f"High tool failure rate: {fail_rate:.1%}", }) # Repeated questions user_msgs = [e["text"].lower().strip() for e in events if "message" in e["type"] and "in" in e["type"]] counts = {} for msg in user_msgs: if len(msg) > 10: key = re.sub(r"[?!.,]", "", msg)[:50] counts[key] = counts.get(key, 0) + 1 repeated = [(k, c) for k, c in counts.items() if c >= THRESHOLDS["repeatedQuestions"]] if repeated: anomalies.append({ "type": "repeated_questions", "severity": "warning", "message": f"User repeated questions {len(repeated)} times — possible frustration", }) # Security exposure for e in events: text = e["text"] if re.search(r"(?:password|token|key|secret)[=:]\s*[^\s]{8,}", text, re.IGNORECASE): anomalies.append({ "type": "security_exposure", "severity": "critical", "message": "Potential credential exposure detected", }) break return anomalies def format_report(anomalies: list) -> str: if not anomalies: return "✅ No anomalies detected — all patterns normal" lines = [f"⚠️ Found {len(anomalies)} anomalies:\n"] for a in anomalies: icon = {"critical": "🔴", "warning": "🟡"}.get(a["severity"], "🔵") lines.append(f"{icon} [{a['severity'].upper()}] {a['type']}") lines.append(f" {a['message']}\n") return "\n".join(lines) def main(): parser = argparse.ArgumentParser(description="Anomaly Detector") parser.add_argument("--hours", type=int, default=1) parser.add_argument("--json", action="store_true") parser.add_argument("--alert", action="store_true") args = parser.parse_args() print("🚨 Anomaly Detector\n") state = _load_state() print(f"📥 Analyzing last {args.hours}h of events...") events = fetch_events(args.hours) print(f" Found {len(events)} events\n") if len(events) < 10: print("✅ Not enough events to analyze") return anomalies = detect_anomalies(events) if args.json: print(json.dumps(anomalies, indent=2)) else: print(format_report(anomalies)) state["lastCheck"] = int(datetime.now().timestamp() * 1000) state["lastAnomalies"] = anomalies _save_state(state) if __name__ == "__main__": main()