darkplex-core/cortex/health_scanner.py
Claudia 734f96cfcf
All checks were successful
Tests / test (push) Successful in 2s
refactor: remove all hardcoded paths, use env vars + config
All ~/clawd/ references replaced with configurable paths:
- CORTEX_HOME (default: ~/.cortex)
- CORTEX_MEMORY_DIR, CORTEX_CONFIG, CORTEX_GROWTH_LOG, CORTEX_ROADMAP
- permanent_files configurable via config.json
- Tests pass both with and without env vars set
- 169/169 tests green
2026-02-09 12:13:18 +01:00

515 lines
20 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Proactive Health Scanner — finds problems BEFORE they become alerts.
Checks: agent health, resource trends, dependency health, configuration drift.
Designed for cron: fast (<30s), no side effects.
Usage:
python3 health_scanner.py [--json] [--section agents|resources|deps|config]
"""
import argparse
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import time
import urllib.request
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
# --- Constants ---
OPENCLAW_CONFIG = Path.home() / ".openclaw" / "openclaw.json"
OPENCLAW_AGENTS_DIR = Path.home() / ".openclaw" / "agents"
DEPRECATED_MODELS = {
"anthropic/claude-3-5-haiku-latest": "2026-02-19",
"claude-3-5-haiku-latest": "2026-02-19",
}
DISK_HISTORY_FILE = Path.home() / ".cache" / "health-scanner" / "disk_history.json"
OLLAMA_URL = "http://localhost:11434/api/tags"
INFO, WARN, CRITICAL = "INFO", "WARN", "CRITICAL"
def _severity_rank(s: str) -> int:
return {"INFO": 0, "WARN": 1, "CRITICAL": 2}.get(s, -1)
class Finding:
def __init__(self, section: str, severity: str, title: str, detail: str = ""):
self.section = section
self.severity = severity
self.title = title
self.detail = detail
def to_dict(self) -> dict:
return {"section": self.section, "severity": self.severity,
"title": self.title, "detail": self.detail}
class HealthScanner:
def __init__(self):
self.findings: list[Finding] = []
self.config: dict = {}
self._load_config()
def _load_config(self):
try:
self.config = json.loads(OPENCLAW_CONFIG.read_text())
except Exception:
self.findings.append(Finding("config", CRITICAL, "Cannot read OpenClaw config",
str(OPENCLAW_CONFIG)))
def _add(self, section: str, severity: str, title: str, detail: str = ""):
self.findings.append(Finding(section, severity, title, detail))
# --- A) Agent Health ---
def check_agents(self):
agents = self.config.get("agents", {}).get("list", [])
if not agents:
self._add("agents", WARN, "No agents found in config")
return
for agent in agents:
aid = agent.get("id", "unknown")
# Workspace check
ws = agent.get("workspace", "")
if ws:
wp = Path(ws)
if not wp.exists():
self._add("agents", CRITICAL, f"Agent '{aid}' workspace missing", ws)
elif not os.access(wp, os.W_OK):
self._add("agents", WARN, f"Agent '{aid}' workspace not writable", ws)
else:
self._add("agents", INFO, f"Agent '{aid}' workspace OK")
else:
self._add("agents", WARN, f"Agent '{aid}' has no workspace configured")
# Session activity check
session_dir = OPENCLAW_AGENTS_DIR / aid / "sessions"
if session_dir.exists():
sessions = list(session_dir.iterdir())
if sessions:
latest_mtime = max(f.stat().st_mtime for f in sessions if f.is_file())
age_days = (time.time() - latest_mtime) / 86400
if age_days > 7:
self._add("agents", WARN,
f"Agent '{aid}' inactive for {age_days:.0f} days",
f"Last session activity: {datetime.fromtimestamp(latest_mtime).isoformat()}")
else:
self._add("agents", INFO,
f"Agent '{aid}' last active {age_days:.1f} days ago")
else:
self._add("agents", INFO, f"Agent '{aid}' has no session files")
else:
self._add("agents", INFO, f"Agent '{aid}' session dir not found")
# Model reachability
model = agent.get("model", {}).get("primary", "")
if model.startswith("ollama"):
self._check_ollama_model(aid, model)
elif model:
self._add("agents", INFO, f"Agent '{aid}' uses cloud model: {model}")
def _check_ollama_model(self, aid: str, model: str):
"""Check if ollama model is reachable."""
try:
req = urllib.request.Request(OLLAMA_URL, method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read())
model_name = model.split("/", 1)[-1] if "/" in model else model
available = [m.get("name", "") for m in data.get("models", [])]
if any(model_name in m for m in available):
self._add("agents", INFO, f"Agent '{aid}' ollama model available: {model_name}")
else:
self._add("agents", WARN,
f"Agent '{aid}' ollama model not found: {model_name}",
f"Available: {', '.join(available[:10])}")
except Exception as e:
self._add("agents", WARN, f"Agent '{aid}' ollama unreachable for model check",
str(e))
# --- B) Resource Trends ---
def check_resources(self):
# Disk usage
self._check_disk()
# Memory
self._check_memory()
# Session file accumulation
self._check_session_files()
# SQLite DB sizes
self._check_db_sizes()
# Log file sizes
self._check_log_sizes()
def _check_disk(self):
usage = shutil.disk_usage("/")
pct = usage.used / usage.total * 100
free_gb = usage.free / (1024 ** 3)
if pct > 95:
self._add("resources", CRITICAL, f"Disk {pct:.1f}% full ({free_gb:.1f}GB free)")
elif pct > 85:
self._add("resources", WARN, f"Disk {pct:.1f}% full ({free_gb:.1f}GB free)")
else:
self._add("resources", INFO, f"Disk {pct:.1f}% used ({free_gb:.1f}GB free)")
# Trend tracking
self._track_disk_trend(usage.used)
def _track_disk_trend(self, current_bytes: int):
try:
DISK_HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True)
history = []
if DISK_HISTORY_FILE.exists():
history = json.loads(DISK_HISTORY_FILE.read_text())
now = time.time()
history.append({"ts": now, "used": current_bytes})
# Keep last 7 days
cutoff = now - 7 * 86400
history = [h for h in history if h["ts"] > cutoff]
DISK_HISTORY_FILE.write_text(json.dumps(history))
if len(history) >= 2:
oldest, newest = history[0], history[-1]
dt = newest["ts"] - oldest["ts"]
if dt > 3600: # at least 1 hour of data
growth_per_day = (newest["used"] - oldest["used"]) / dt * 86400
gb_per_day = growth_per_day / (1024 ** 3)
if gb_per_day > 1:
total = shutil.disk_usage("/").total
remaining = total - current_bytes
days_left = remaining / growth_per_day if growth_per_day > 0 else 999
self._add("resources", WARN,
f"Disk growing {gb_per_day:.1f}GB/day, ~{days_left:.0f} days until full")
except Exception:
pass
def _check_memory(self):
try:
with open("/proc/meminfo") as f:
info = {}
for line in f:
parts = line.split(":")
if len(parts) == 2:
key = parts[0].strip()
val = int(parts[1].strip().split()[0]) # kB
info[key] = val
total = info.get("MemTotal", 1)
available = info.get("MemAvailable", total)
used_pct = (1 - available / total) * 100
if used_pct > 90:
self._add("resources", CRITICAL, f"Memory {used_pct:.0f}% used")
elif used_pct > 80:
self._add("resources", WARN, f"Memory {used_pct:.0f}% used")
else:
self._add("resources", INFO, f"Memory {used_pct:.0f}% used")
except Exception as e:
self._add("resources", WARN, "Cannot read memory info", str(e))
def _check_session_files(self):
if not OPENCLAW_AGENTS_DIR.exists():
return
for agent_dir in OPENCLAW_AGENTS_DIR.iterdir():
sessions = agent_dir / "sessions"
if sessions.exists():
count = sum(1 for _ in sessions.iterdir())
if count > 1000:
self._add("resources", WARN,
f"Agent '{agent_dir.name}' has {count} session files")
elif count > 500:
self._add("resources", INFO,
f"Agent '{agent_dir.name}' has {count} session files")
def _check_db_sizes(self):
"""Check for large SQLite/DB files."""
search_paths = [
Path.home() / ".openclaw",
Path(os.environ.get("CORTEX_HOME", str(Path.home() / ".cortex"))),
]
for base in search_paths:
if not base.exists():
continue
try:
for db_file in base.rglob("*.db"):
size_mb = db_file.stat().st_size / (1024 ** 2)
if size_mb > 500:
self._add("resources", WARN,
f"Large DB: {db_file} ({size_mb:.0f}MB)")
for db_file in base.rglob("*.sqlite"):
size_mb = db_file.stat().st_size / (1024 ** 2)
if size_mb > 500:
self._add("resources", WARN,
f"Large DB: {db_file} ({size_mb:.0f}MB)")
except PermissionError:
pass
def _check_log_sizes(self):
log_dirs = [
Path.home() / ".openclaw" / "logs",
Path(os.environ.get("CORTEX_HOME", str(Path.home() / ".cortex"))) / "logs",
Path("/tmp"),
]
for d in log_dirs:
if not d.exists():
continue
try:
for f in d.iterdir():
if f.suffix in (".log", ".txt") and f.is_file():
size_mb = f.stat().st_size / (1024 ** 2)
if size_mb > 100:
self._add("resources", WARN,
f"Large log: {f} ({size_mb:.0f}MB)")
except PermissionError:
pass
# --- C) Dependency Health ---
def check_deps(self):
self._check_nats()
self._check_typedb()
self._check_chromadb()
self._check_ollama()
self._check_key_expiry()
def _run_cmd(self, cmd: list[str], timeout: int = 5) -> tuple[int, str]:
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return r.returncode, r.stdout.strip()
except subprocess.TimeoutExpired:
return -1, "timeout"
except FileNotFoundError:
return -2, "not found"
except Exception as e:
return -3, str(e)
def _check_docker_container(self, name: str) -> bool:
"""Check if a service is running as a Docker container."""
rc, out = self._run_cmd(
["sg", "docker", "-c", f"docker ps --filter name={name} --format '{{{{.Names}}}} {{{{.Status}}}}'"],
timeout=10
)
if rc == 0 and out.strip():
return True
return False
def _check_nats(self):
rc, out = self._run_cmd(["systemctl", "--user", "is-active", "nats-server"])
if rc == 0 and out == "active":
self._add("deps", INFO, "NATS server active")
elif self._check_docker_container("nats"):
self._add("deps", INFO, "NATS server active (Docker)")
else:
self._add("deps", CRITICAL, "NATS server not active", out)
def _check_typedb(self):
# Check for typedb process
rc, out = self._run_cmd(["pgrep", "-f", "typedb"])
if rc == 0:
self._add("deps", INFO, "TypeDB running")
elif self._check_docker_container("typedb"):
self._add("deps", INFO, "TypeDB running (Docker)")
else:
rc2, out2 = self._run_cmd(["systemctl", "--user", "is-active", "typedb"])
if rc2 == 0 and out2 == "active":
self._add("deps", INFO, "TypeDB service active")
else:
self._add("deps", WARN, "TypeDB not detected", "May not be running")
def _check_chromadb(self):
rag_dirs = list(Path.home().glob("**/.rag-db"))[:3]
rag_db = Path(os.environ.get("CORTEX_HOME", str(Path.home() / ".cortex"))) / ".rag-db"
if rag_db.exists():
age_hours = (time.time() - rag_db.stat().st_mtime) / 3600
if age_hours > 48:
self._add("deps", WARN,
f"ChromaDB (.rag-db) last modified {age_hours:.0f}h ago")
else:
self._add("deps", INFO, f"ChromaDB (.rag-db) fresh ({age_hours:.0f}h)")
else:
self._add("deps", INFO, "No .rag-db found in workspace")
def _check_ollama(self):
try:
req = urllib.request.Request(OLLAMA_URL, method="GET")
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read())
count = len(data.get("models", []))
self._add("deps", INFO, f"Ollama reachable ({count} models)")
except Exception as e:
self._add("deps", WARN, "Ollama not reachable", str(e))
def _check_key_expiry(self):
"""Scan env files for date-like patterns that might indicate key expiry."""
env_files = list(Path.home().glob(".config/**/*.env"))
env_files += list(Path(os.environ.get("CORTEX_HOME", str(Path.home() / ".cortex"))).glob("**/.env"))
env_files += [Path.home() / ".env"]
now = datetime.now()
date_pattern = re.compile(r'(\d{4}-\d{2}-\d{2})')
for ef in env_files:
if not ef.exists() or not ef.is_file():
continue
try:
content = ef.read_text()
for match in date_pattern.finditer(content):
try:
d = datetime.fromisoformat(match.group(1))
if now < d < now + timedelta(days=30):
self._add("deps", WARN,
f"Possible expiry date {match.group(1)} in {ef.name}",
str(ef))
except ValueError:
pass
except Exception:
pass
# --- D) Configuration Drift ---
def check_config(self):
self._check_deprecated_models()
self._check_config_hash()
def _check_deprecated_models(self):
agents = self.config.get("agents", {}).get("list", [])
defaults = self.config.get("agents", {}).get("defaults", {})
now = datetime.now()
all_models = set()
# Collect from defaults
dm = defaults.get("model", {})
if dm.get("primary"):
all_models.add(dm["primary"])
for fb in dm.get("fallbacks", []):
all_models.add(fb)
for m in defaults.get("models", {}):
all_models.add(m)
# Collect from agents
for agent in agents:
am = agent.get("model", {})
if am.get("primary"):
all_models.add(am["primary"])
for fb in am.get("fallbacks", []):
all_models.add(fb)
hb = agent.get("heartbeat", {})
if hb.get("model"):
all_models.add(hb["model"])
# Also check defaults heartbeat
dhb = defaults.get("heartbeat", {})
if dhb.get("model"):
all_models.add(dhb["model"])
for model in all_models:
for dep_model, eol_date in DEPRECATED_MODELS.items():
if dep_model in model:
eol = datetime.fromisoformat(eol_date)
days_left = (eol - now).days
if days_left < 0:
self._add("config", CRITICAL,
f"Model '{model}' is past EOL ({eol_date})")
elif days_left < 14:
self._add("config", WARN,
f"Model '{model}' EOL in {days_left} days ({eol_date})")
else:
self._add("config", INFO,
f"Model '{model}' EOL on {eol_date} ({days_left} days)")
def _check_config_hash(self):
"""Check if config file has changed since last scan."""
hash_file = Path.home() / ".cache" / "health-scanner" / "config_hash.txt"
hash_file.parent.mkdir(parents=True, exist_ok=True)
try:
current_hash = hashlib.sha256(OPENCLAW_CONFIG.read_bytes()).hexdigest()[:16]
if hash_file.exists():
prev_hash = hash_file.read_text().strip()
if prev_hash != current_hash:
self._add("config", INFO, "Config file changed since last scan",
f"Old: {prev_hash}, New: {current_hash}")
hash_file.write_text(current_hash)
except Exception as e:
self._add("config", WARN, "Cannot track config hash", str(e))
# --- Run ---
def run(self, sections: list[str] | None = None) -> dict:
section_map = {
"agents": self.check_agents,
"resources": self.check_resources,
"deps": self.check_deps,
"config": self.check_config,
}
targets = sections if sections else list(section_map.keys())
for s in targets:
if s in section_map:
try:
section_map[s]()
except Exception as e:
self._add(s, CRITICAL, f"Section '{s}' check failed", str(e))
# Build report
worst = INFO
for f in self.findings:
if _severity_rank(f.severity) > _severity_rank(worst):
worst = f.severity
return {
"timestamp": datetime.now().isoformat(),
"overall": worst,
"findings_count": {
INFO: sum(1 for f in self.findings if f.severity == INFO),
WARN: sum(1 for f in self.findings if f.severity == WARN),
CRITICAL: sum(1 for f in self.findings if f.severity == CRITICAL),
},
"findings": [f.to_dict() for f in self.findings],
}
def format_human(report: dict) -> str:
lines = []
overall = report["overall"]
icon = {"INFO": "", "WARN": "⚠️", "CRITICAL": "🚨"}.get(overall, "")
lines.append(f"{icon} Health Report — {overall}")
lines.append(f" {report['findings_count']}")
lines.append(f" {report['timestamp']}")
lines.append("")
for sev in [CRITICAL, WARN, INFO]:
items = [f for f in report["findings"] if f["severity"] == sev]
if not items:
continue
icon = {"CRITICAL": "🚨", "WARN": "⚠️", "INFO": ""}[sev]
lines.append(f"--- {sev} ({len(items)}) ---")
for f in items:
lines.append(f" {icon} [{f['section']}] {f['title']}")
if f["detail"]:
lines.append(f" {f['detail']}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Proactive Health Scanner")
parser.add_argument("--json", action="store_true", help="Output JSON")
parser.add_argument("--section", type=str, help="Comma-separated sections: agents,resources,deps,config")
args = parser.parse_args()
sections = args.section.split(",") if args.section else None
scanner = HealthScanner()
report = scanner.run(sections)
if args.json:
print(json.dumps(report, indent=2))
else:
print(format_human(report))
# Exit code: 2 for CRITICAL, 1 for WARN, 0 for INFO
if report["overall"] == CRITICAL:
sys.exit(2)
elif report["overall"] == WARN:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()