195 lines
6.2 KiB
Python
195 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Anomaly Detector — Real-time pattern detection and alerting.
|
|
|
|
Detects error spikes, tool failures, repeated questions, security patterns.
|
|
|
|
Usage:
|
|
cortex anomaly [--hours N] [--json] [--alert]
|
|
"""
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from cortex.config import cortex_home, memory_dir
|
|
|
|
THRESHOLDS = {
|
|
"errorRate": 0.1,
|
|
"toolFailureRate": 0.2,
|
|
"repeatedQuestions": 3,
|
|
"securityKeywords": ["password", "credential", "token", "api key", "secret"],
|
|
}
|
|
|
|
|
|
def state_file():
|
|
return memory_dir() / "anomaly-state.json"
|
|
|
|
|
|
def _load_state():
|
|
sf = state_file()
|
|
if sf.exists():
|
|
try:
|
|
return json.loads(sf.read_text())
|
|
except Exception:
|
|
pass
|
|
return {"lastCheck": 0, "alertedAnomalies": []}
|
|
|
|
|
|
def _save_state(state):
|
|
sf = state_file()
|
|
sf.parent.mkdir(parents=True, exist_ok=True)
|
|
sf.write_text(json.dumps(state, indent=2))
|
|
|
|
|
|
def fetch_events(hours: int = 1) -> list:
|
|
nats = str(Path.home() / "bin" / "nats")
|
|
cutoff_ms = int((datetime.now().timestamp() - hours * 3600) * 1000)
|
|
events = []
|
|
|
|
try:
|
|
r = subprocess.run([nats, "stream", "info", "openclaw-events", "--json"],
|
|
capture_output=True, text=True, timeout=10)
|
|
if r.returncode != 0:
|
|
return events
|
|
info = json.loads(r.stdout)
|
|
end_seq = info["state"]["last_seq"]
|
|
start_seq = max(info["state"]["first_seq"], end_seq - 200)
|
|
|
|
for seq in range(start_seq, end_seq + 1):
|
|
try:
|
|
r = subprocess.run(
|
|
[nats, "stream", "get", "openclaw-events", str(seq), "--json"],
|
|
capture_output=True, text=True, timeout=2,
|
|
)
|
|
if r.returncode != 0:
|
|
continue
|
|
msg = json.loads(r.stdout)
|
|
data = json.loads(base64.b64decode(msg["data"]).decode("utf-8"))
|
|
ts = data.get("timestamp") or data.get("timestampMs", 0)
|
|
if isinstance(ts, (int, float)) and ts > 1e12:
|
|
pass # already ms
|
|
elif isinstance(ts, (int, float)):
|
|
ts = int(ts * 1000)
|
|
|
|
if ts > cutoff_ms:
|
|
events.append({
|
|
"time": ts,
|
|
"type": data.get("type", "unknown"),
|
|
"text": (data.get("payload", {}).get("data", {}).get("text", "") or ""),
|
|
"tool": data.get("payload", {}).get("data", {}).get("name", ""),
|
|
"isError": data.get("payload", {}).get("data", {}).get("isError", False),
|
|
"agent": data.get("agent", "unknown"),
|
|
})
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
return events
|
|
|
|
|
|
def detect_anomalies(events: list) -> list:
|
|
anomalies = []
|
|
if len(events) < 10:
|
|
return anomalies
|
|
|
|
# Error rate
|
|
error_count = sum(1 for e in events if "error" in e["type"] or e["isError"] or "error" in e["text"].lower())
|
|
error_rate = error_count / len(events)
|
|
if error_rate > THRESHOLDS["errorRate"]:
|
|
anomalies.append({
|
|
"type": "error_spike",
|
|
"severity": "critical" if error_rate > 0.3 else "warning",
|
|
"message": f"High error rate: {error_rate:.1%} ({error_count}/{len(events)} events)",
|
|
})
|
|
|
|
# Tool failures
|
|
tool_events = [e for e in events if e["tool"]]
|
|
tool_errors = [e for e in tool_events if e["isError"] or "error" in e["text"].lower()]
|
|
if len(tool_events) > 5:
|
|
fail_rate = len(tool_errors) / len(tool_events)
|
|
if fail_rate > THRESHOLDS["toolFailureRate"]:
|
|
anomalies.append({
|
|
"type": "tool_failures",
|
|
"severity": "warning",
|
|
"message": f"High tool failure rate: {fail_rate:.1%}",
|
|
})
|
|
|
|
# Repeated questions
|
|
user_msgs = [e["text"].lower().strip() for e in events if "message" in e["type"] and "in" in e["type"]]
|
|
counts = {}
|
|
for msg in user_msgs:
|
|
if len(msg) > 10:
|
|
key = re.sub(r"[?!.,]", "", msg)[:50]
|
|
counts[key] = counts.get(key, 0) + 1
|
|
repeated = [(k, c) for k, c in counts.items() if c >= THRESHOLDS["repeatedQuestions"]]
|
|
if repeated:
|
|
anomalies.append({
|
|
"type": "repeated_questions",
|
|
"severity": "warning",
|
|
"message": f"User repeated questions {len(repeated)} times — possible frustration",
|
|
})
|
|
|
|
# Security exposure
|
|
for e in events:
|
|
text = e["text"]
|
|
if re.search(r"(?:password|token|key|secret)[=:]\s*[^\s]{8,}", text, re.IGNORECASE):
|
|
anomalies.append({
|
|
"type": "security_exposure",
|
|
"severity": "critical",
|
|
"message": "Potential credential exposure detected",
|
|
})
|
|
break
|
|
|
|
return anomalies
|
|
|
|
|
|
def format_report(anomalies: list) -> str:
|
|
if not anomalies:
|
|
return "✅ No anomalies detected — all patterns normal"
|
|
|
|
lines = [f"⚠️ Found {len(anomalies)} anomalies:\n"]
|
|
for a in anomalies:
|
|
icon = {"critical": "🔴", "warning": "🟡"}.get(a["severity"], "🔵")
|
|
lines.append(f"{icon} [{a['severity'].upper()}] {a['type']}")
|
|
lines.append(f" {a['message']}\n")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Anomaly Detector")
|
|
parser.add_argument("--hours", type=int, default=1)
|
|
parser.add_argument("--json", action="store_true")
|
|
parser.add_argument("--alert", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
print("🚨 Anomaly Detector\n")
|
|
|
|
state = _load_state()
|
|
print(f"📥 Analyzing last {args.hours}h of events...")
|
|
events = fetch_events(args.hours)
|
|
print(f" Found {len(events)} events\n")
|
|
|
|
if len(events) < 10:
|
|
print("✅ Not enough events to analyze")
|
|
return
|
|
|
|
anomalies = detect_anomalies(events)
|
|
|
|
if args.json:
|
|
print(json.dumps(anomalies, indent=2))
|
|
else:
|
|
print(format_report(anomalies))
|
|
|
|
state["lastCheck"] = int(datetime.now().timestamp() * 1000)
|
|
state["lastAnomalies"] = anomalies
|
|
_save_state(state)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|