darkplex-core/cortex/anomaly.py
Claudia fda607c204
All checks were successful
Tests / test (push) Successful in 4s
fix: sync missing import os + stray } from darkplex-core PR #2 (YesMan)
2026-02-11 20:25:29 +01:00

195 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""Anomaly Detector — Real-time pattern detection and alerting.
Detects error spikes, tool failures, repeated questions, security patterns.
Usage:
cortex anomaly [--hours N] [--json] [--alert]
"""
import argparse
import base64
import json
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from cortex.config import cortex_home, memory_dir
THRESHOLDS = {
"errorRate": 0.1,
"toolFailureRate": 0.2,
"repeatedQuestions": 3,
"securityKeywords": ["password", "credential", "token", "api key", "secret"],
}
def state_file():
return memory_dir() / "anomaly-state.json"
def _load_state():
sf = state_file()
if sf.exists():
try:
return json.loads(sf.read_text())
except Exception:
pass
return {"lastCheck": 0, "alertedAnomalies": []}
def _save_state(state):
sf = state_file()
sf.parent.mkdir(parents=True, exist_ok=True)
sf.write_text(json.dumps(state, indent=2))
def fetch_events(hours: int = 1) -> list:
nats = str(Path.home() / "bin" / "nats")
cutoff_ms = int((datetime.now().timestamp() - hours * 3600) * 1000)
events = []
try:
r = subprocess.run([nats, "stream", "info", "openclaw-events", "--json"],
capture_output=True, text=True, timeout=10)
if r.returncode != 0:
return events
info = json.loads(r.stdout)
end_seq = info["state"]["last_seq"]
start_seq = max(info["state"]["first_seq"], end_seq - 200)
for seq in range(start_seq, end_seq + 1):
try:
r = subprocess.run(
[nats, "stream", "get", "openclaw-events", str(seq), "--json"],
capture_output=True, text=True, timeout=2,
)
if r.returncode != 0:
continue
msg = json.loads(r.stdout)
data = json.loads(base64.b64decode(msg["data"]).decode("utf-8"))
ts = data.get("timestamp") or data.get("timestampMs", 0)
if isinstance(ts, (int, float)) and ts > 1e12:
pass # already ms
elif isinstance(ts, (int, float)):
ts = int(ts * 1000)
if ts > cutoff_ms:
events.append({
"time": ts,
"type": data.get("type", "unknown"),
"text": (data.get("payload", {}).get("data", {}).get("text", "") or ""),
"tool": data.get("payload", {}).get("data", {}).get("name", ""),
"isError": data.get("payload", {}).get("data", {}).get("isError", False),
"agent": data.get("agent", "unknown"),
})
except Exception:
continue
except Exception:
pass
return events
def detect_anomalies(events: list) -> list:
anomalies = []
if len(events) < 10:
return anomalies
# Error rate
error_count = sum(1 for e in events if "error" in e["type"] or e["isError"] or "error" in e["text"].lower())
error_rate = error_count / len(events)
if error_rate > THRESHOLDS["errorRate"]:
anomalies.append({
"type": "error_spike",
"severity": "critical" if error_rate > 0.3 else "warning",
"message": f"High error rate: {error_rate:.1%} ({error_count}/{len(events)} events)",
})
# Tool failures
tool_events = [e for e in events if e["tool"]]
tool_errors = [e for e in tool_events if e["isError"] or "error" in e["text"].lower()]
if len(tool_events) > 5:
fail_rate = len(tool_errors) / len(tool_events)
if fail_rate > THRESHOLDS["toolFailureRate"]:
anomalies.append({
"type": "tool_failures",
"severity": "warning",
"message": f"High tool failure rate: {fail_rate:.1%}",
})
# Repeated questions
user_msgs = [e["text"].lower().strip() for e in events if "message" in e["type"] and "in" in e["type"]]
counts = {}
for msg in user_msgs:
if len(msg) > 10:
key = re.sub(r"[?!.,]", "", msg)[:50]
counts[key] = counts.get(key, 0) + 1
repeated = [(k, c) for k, c in counts.items() if c >= THRESHOLDS["repeatedQuestions"]]
if repeated:
anomalies.append({
"type": "repeated_questions",
"severity": "warning",
"message": f"User repeated questions {len(repeated)} times — possible frustration",
})
# Security exposure
for e in events:
text = e["text"]
if re.search(r"(?:password|token|key|secret)[=:]\s*[^\s]{8,}", text, re.IGNORECASE):
anomalies.append({
"type": "security_exposure",
"severity": "critical",
"message": "Potential credential exposure detected",
})
break
return anomalies
def format_report(anomalies: list) -> str:
if not anomalies:
return "✅ No anomalies detected — all patterns normal"
lines = [f"⚠️ Found {len(anomalies)} anomalies:\n"]
for a in anomalies:
icon = {"critical": "🔴", "warning": "🟡"}.get(a["severity"], "🔵")
lines.append(f"{icon} [{a['severity'].upper()}] {a['type']}")
lines.append(f" {a['message']}\n")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Anomaly Detector")
parser.add_argument("--hours", type=int, default=1)
parser.add_argument("--json", action="store_true")
parser.add_argument("--alert", action="store_true")
args = parser.parse_args()
print("🚨 Anomaly Detector\n")
state = _load_state()
print(f"📥 Analyzing last {args.hours}h of events...")
events = fetch_events(args.hours)
print(f" Found {len(events)} events\n")
if len(events) < 10:
print("✅ Not enough events to analyze")
return
anomalies = detect_anomalies(events)
if args.json:
print(json.dumps(anomalies, indent=2))
else:
print(format_report(anomalies))
state["lastCheck"] = int(datetime.now().timestamp() * 1000)
state["lastAnomalies"] = anomalies
_save_state(state)
if __name__ == "__main__":
main()