#!/usr/bin/env python3 """Proactive Health Scanner — finds problems BEFORE they become alerts. Checks: agent health, resource trends, dependency health, configuration drift. Designed for cron: fast (<30s), no side effects. Usage: python3 health_scanner.py [--json] [--section agents|resources|deps|config] """ import argparse import hashlib import json import os import re import shutil import subprocess import sys import time import urllib.request from datetime import datetime, timedelta from pathlib import Path from typing import Any # --- Constants --- OPENCLAW_CONFIG = Path.home() / ".openclaw" / "openclaw.json" OPENCLAW_AGENTS_DIR = Path.home() / ".openclaw" / "agents" DEPRECATED_MODELS = { "anthropic/claude-3-5-haiku-latest": "2026-02-19", "claude-3-5-haiku-latest": "2026-02-19", } DISK_HISTORY_FILE = Path.home() / ".cache" / "health-scanner" / "disk_history.json" OLLAMA_URL = "http://localhost:11434/api/tags" INFO, WARN, CRITICAL = "INFO", "WARN", "CRITICAL" def _severity_rank(s: str) -> int: return {"INFO": 0, "WARN": 1, "CRITICAL": 2}.get(s, -1) class Finding: def __init__(self, section: str, severity: str, title: str, detail: str = ""): self.section = section self.severity = severity self.title = title self.detail = detail def to_dict(self) -> dict: return {"section": self.section, "severity": self.severity, "title": self.title, "detail": self.detail} class HealthScanner: def __init__(self): self.findings: list[Finding] = [] self.config: dict = {} self._load_config() def _load_config(self): try: self.config = json.loads(OPENCLAW_CONFIG.read_text()) except Exception: self.findings.append(Finding("config", CRITICAL, "Cannot read OpenClaw config", str(OPENCLAW_CONFIG))) def _add(self, section: str, severity: str, title: str, detail: str = ""): self.findings.append(Finding(section, severity, title, detail)) # --- A) Agent Health --- def check_agents(self): agents = self.config.get("agents", {}).get("list", []) if not agents: self._add("agents", WARN, "No agents found in config") return for agent in agents: aid = agent.get("id", "unknown") # Workspace check ws = agent.get("workspace", "") if ws: wp = Path(ws) if not wp.exists(): self._add("agents", CRITICAL, f"Agent '{aid}' workspace missing", ws) elif not os.access(wp, os.W_OK): self._add("agents", WARN, f"Agent '{aid}' workspace not writable", ws) else: self._add("agents", INFO, f"Agent '{aid}' workspace OK") else: self._add("agents", WARN, f"Agent '{aid}' has no workspace configured") # Session activity check session_dir = OPENCLAW_AGENTS_DIR / aid / "sessions" if session_dir.exists(): sessions = list(session_dir.iterdir()) if sessions: latest_mtime = max(f.stat().st_mtime for f in sessions if f.is_file()) age_days = (time.time() - latest_mtime) / 86400 if age_days > 7: self._add("agents", WARN, f"Agent '{aid}' inactive for {age_days:.0f} days", f"Last session activity: {datetime.fromtimestamp(latest_mtime).isoformat()}") else: self._add("agents", INFO, f"Agent '{aid}' last active {age_days:.1f} days ago") else: self._add("agents", INFO, f"Agent '{aid}' has no session files") else: self._add("agents", INFO, f"Agent '{aid}' session dir not found") # Model reachability model = agent.get("model", {}).get("primary", "") if model.startswith("ollama"): self._check_ollama_model(aid, model) elif model: self._add("agents", INFO, f"Agent '{aid}' uses cloud model: {model}") def _check_ollama_model(self, aid: str, model: str): """Check if ollama model is reachable.""" try: req = urllib.request.Request(OLLAMA_URL, method="GET") with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read()) model_name = model.split("/", 1)[-1] if "/" in model else model available = [m.get("name", "") for m in data.get("models", [])] if any(model_name in m for m in available): self._add("agents", INFO, f"Agent '{aid}' ollama model available: {model_name}") else: self._add("agents", WARN, f"Agent '{aid}' ollama model not found: {model_name}", f"Available: {', '.join(available[:10])}") except Exception as e: self._add("agents", WARN, f"Agent '{aid}' ollama unreachable for model check", str(e)) # --- B) Resource Trends --- def check_resources(self): # Disk usage self._check_disk() # Memory self._check_memory() # Session file accumulation self._check_session_files() # SQLite DB sizes self._check_db_sizes() # Log file sizes self._check_log_sizes() def _check_disk(self): usage = shutil.disk_usage("/") pct = usage.used / usage.total * 100 free_gb = usage.free / (1024 ** 3) if pct > 95: self._add("resources", CRITICAL, f"Disk {pct:.1f}% full ({free_gb:.1f}GB free)") elif pct > 85: self._add("resources", WARN, f"Disk {pct:.1f}% full ({free_gb:.1f}GB free)") else: self._add("resources", INFO, f"Disk {pct:.1f}% used ({free_gb:.1f}GB free)") # Trend tracking self._track_disk_trend(usage.used) def _track_disk_trend(self, current_bytes: int): try: DISK_HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True) history = [] if DISK_HISTORY_FILE.exists(): history = json.loads(DISK_HISTORY_FILE.read_text()) now = time.time() history.append({"ts": now, "used": current_bytes}) # Keep last 7 days cutoff = now - 7 * 86400 history = [h for h in history if h["ts"] > cutoff] DISK_HISTORY_FILE.write_text(json.dumps(history)) if len(history) >= 2: oldest, newest = history[0], history[-1] dt = newest["ts"] - oldest["ts"] if dt > 3600: # at least 1 hour of data growth_per_day = (newest["used"] - oldest["used"]) / dt * 86400 gb_per_day = growth_per_day / (1024 ** 3) if gb_per_day > 1: total = shutil.disk_usage("/").total remaining = total - current_bytes days_left = remaining / growth_per_day if growth_per_day > 0 else 999 self._add("resources", WARN, f"Disk growing {gb_per_day:.1f}GB/day, ~{days_left:.0f} days until full") except Exception: pass def _check_memory(self): try: with open("/proc/meminfo") as f: info = {} for line in f: parts = line.split(":") if len(parts) == 2: key = parts[0].strip() val = int(parts[1].strip().split()[0]) # kB info[key] = val total = info.get("MemTotal", 1) available = info.get("MemAvailable", total) used_pct = (1 - available / total) * 100 if used_pct > 90: self._add("resources", CRITICAL, f"Memory {used_pct:.0f}% used") elif used_pct > 80: self._add("resources", WARN, f"Memory {used_pct:.0f}% used") else: self._add("resources", INFO, f"Memory {used_pct:.0f}% used") except Exception as e: self._add("resources", WARN, "Cannot read memory info", str(e)) def _check_session_files(self): if not OPENCLAW_AGENTS_DIR.exists(): return for agent_dir in OPENCLAW_AGENTS_DIR.iterdir(): sessions = agent_dir / "sessions" if sessions.exists(): count = sum(1 for _ in sessions.iterdir()) if count > 1000: self._add("resources", WARN, f"Agent '{agent_dir.name}' has {count} session files") elif count > 500: self._add("resources", INFO, f"Agent '{agent_dir.name}' has {count} session files") def _check_db_sizes(self): """Check for large SQLite/DB files.""" search_paths = [ Path.home() / ".openclaw", Path(os.environ.get("CORTEX_HOME", str(Path.home() / ".cortex"))), ] for base in search_paths: if not base.exists(): continue try: for db_file in base.rglob("*.db"): size_mb = db_file.stat().st_size / (1024 ** 2) if size_mb > 500: self._add("resources", WARN, f"Large DB: {db_file} ({size_mb:.0f}MB)") for db_file in base.rglob("*.sqlite"): size_mb = db_file.stat().st_size / (1024 ** 2) if size_mb > 500: self._add("resources", WARN, f"Large DB: {db_file} ({size_mb:.0f}MB)") except PermissionError: pass def _check_log_sizes(self): log_dirs = [ Path.home() / ".openclaw" / "logs", Path(os.environ.get("CORTEX_HOME", str(Path.home() / ".cortex"))) / "logs", Path("/tmp"), ] for d in log_dirs: if not d.exists(): continue try: for f in d.iterdir(): if f.suffix in (".log", ".txt") and f.is_file(): size_mb = f.stat().st_size / (1024 ** 2) if size_mb > 100: self._add("resources", WARN, f"Large log: {f} ({size_mb:.0f}MB)") except PermissionError: pass # --- C) Dependency Health --- def check_deps(self): self._check_nats() self._check_typedb() self._check_chromadb() self._check_ollama() self._check_key_expiry() def _run_cmd(self, cmd: list[str], timeout: int = 5) -> tuple[int, str]: try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) return r.returncode, r.stdout.strip() except subprocess.TimeoutExpired: return -1, "timeout" except FileNotFoundError: return -2, "not found" except Exception as e: return -3, str(e) def _check_docker_container(self, name: str) -> bool: """Check if a service is running as a Docker container.""" rc, out = self._run_cmd( ["sg", "docker", "-c", f"docker ps --filter name={name} --format '{{{{.Names}}}} {{{{.Status}}}}'"], timeout=10 ) if rc == 0 and out.strip(): return True return False def _check_nats(self): rc, out = self._run_cmd(["systemctl", "--user", "is-active", "nats-server"]) if rc == 0 and out == "active": self._add("deps", INFO, "NATS server active") elif self._check_docker_container("nats"): self._add("deps", INFO, "NATS server active (Docker)") else: self._add("deps", CRITICAL, "NATS server not active", out) def _check_typedb(self): # Check for typedb process rc, out = self._run_cmd(["pgrep", "-f", "typedb"]) if rc == 0: self._add("deps", INFO, "TypeDB running") elif self._check_docker_container("typedb"): self._add("deps", INFO, "TypeDB running (Docker)") else: rc2, out2 = self._run_cmd(["systemctl", "--user", "is-active", "typedb"]) if rc2 == 0 and out2 == "active": self._add("deps", INFO, "TypeDB service active") else: self._add("deps", WARN, "TypeDB not detected", "May not be running") def _check_chromadb(self): rag_dirs = list(Path.home().glob("**/.rag-db"))[:3] rag_db = Path(os.environ.get("CORTEX_HOME", str(Path.home() / ".cortex"))) / ".rag-db" if rag_db.exists(): age_hours = (time.time() - rag_db.stat().st_mtime) / 3600 if age_hours > 48: self._add("deps", WARN, f"ChromaDB (.rag-db) last modified {age_hours:.0f}h ago") else: self._add("deps", INFO, f"ChromaDB (.rag-db) fresh ({age_hours:.0f}h)") else: self._add("deps", INFO, "No .rag-db found in workspace") def _check_ollama(self): try: req = urllib.request.Request(OLLAMA_URL, method="GET") with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read()) count = len(data.get("models", [])) self._add("deps", INFO, f"Ollama reachable ({count} models)") except Exception as e: self._add("deps", WARN, "Ollama not reachable", str(e)) def _check_key_expiry(self): """Scan env files for date-like patterns that might indicate key expiry.""" env_files = list(Path.home().glob(".config/**/*.env")) env_files += list(Path(os.environ.get("CORTEX_HOME", str(Path.home() / ".cortex"))).glob("**/.env")) env_files += [Path.home() / ".env"] now = datetime.now() date_pattern = re.compile(r'(\d{4}-\d{2}-\d{2})') for ef in env_files: if not ef.exists() or not ef.is_file(): continue try: content = ef.read_text() for match in date_pattern.finditer(content): try: d = datetime.fromisoformat(match.group(1)) if now < d < now + timedelta(days=30): self._add("deps", WARN, f"Possible expiry date {match.group(1)} in {ef.name}", str(ef)) except ValueError: pass except Exception: pass # --- D) Configuration Drift --- def check_config(self): self._check_deprecated_models() self._check_config_hash() def _check_deprecated_models(self): agents = self.config.get("agents", {}).get("list", []) defaults = self.config.get("agents", {}).get("defaults", {}) now = datetime.now() all_models = set() # Collect from defaults dm = defaults.get("model", {}) if dm.get("primary"): all_models.add(dm["primary"]) for fb in dm.get("fallbacks", []): all_models.add(fb) for m in defaults.get("models", {}): all_models.add(m) # Collect from agents for agent in agents: am = agent.get("model", {}) if am.get("primary"): all_models.add(am["primary"]) for fb in am.get("fallbacks", []): all_models.add(fb) hb = agent.get("heartbeat", {}) if hb.get("model"): all_models.add(hb["model"]) # Also check defaults heartbeat dhb = defaults.get("heartbeat", {}) if dhb.get("model"): all_models.add(dhb["model"]) for model in all_models: for dep_model, eol_date in DEPRECATED_MODELS.items(): if dep_model in model: eol = datetime.fromisoformat(eol_date) days_left = (eol - now).days if days_left < 0: self._add("config", CRITICAL, f"Model '{model}' is past EOL ({eol_date})") elif days_left < 14: self._add("config", WARN, f"Model '{model}' EOL in {days_left} days ({eol_date})") else: self._add("config", INFO, f"Model '{model}' EOL on {eol_date} ({days_left} days)") def _check_config_hash(self): """Check if config file has changed since last scan.""" hash_file = Path.home() / ".cache" / "health-scanner" / "config_hash.txt" hash_file.parent.mkdir(parents=True, exist_ok=True) try: current_hash = hashlib.sha256(OPENCLAW_CONFIG.read_bytes()).hexdigest()[:16] if hash_file.exists(): prev_hash = hash_file.read_text().strip() if prev_hash != current_hash: self._add("config", INFO, "Config file changed since last scan", f"Old: {prev_hash}, New: {current_hash}") hash_file.write_text(current_hash) except Exception as e: self._add("config", WARN, "Cannot track config hash", str(e)) # --- Run --- def run(self, sections: list[str] | None = None) -> dict: section_map = { "agents": self.check_agents, "resources": self.check_resources, "deps": self.check_deps, "config": self.check_config, } targets = sections if sections else list(section_map.keys()) for s in targets: if s in section_map: try: section_map[s]() except Exception as e: self._add(s, CRITICAL, f"Section '{s}' check failed", str(e)) # Build report worst = INFO for f in self.findings: if _severity_rank(f.severity) > _severity_rank(worst): worst = f.severity return { "timestamp": datetime.now().isoformat(), "overall": worst, "findings_count": { INFO: sum(1 for f in self.findings if f.severity == INFO), WARN: sum(1 for f in self.findings if f.severity == WARN), CRITICAL: sum(1 for f in self.findings if f.severity == CRITICAL), }, "findings": [f.to_dict() for f in self.findings], } def format_human(report: dict) -> str: lines = [] overall = report["overall"] icon = {"INFO": "✅", "WARN": "âš ī¸", "CRITICAL": "🚨"}.get(overall, "❓") lines.append(f"{icon} Health Report — {overall}") lines.append(f" {report['findings_count']}") lines.append(f" {report['timestamp']}") lines.append("") for sev in [CRITICAL, WARN, INFO]: items = [f for f in report["findings"] if f["severity"] == sev] if not items: continue icon = {"CRITICAL": "🚨", "WARN": "âš ī¸", "INFO": "â„šī¸"}[sev] lines.append(f"--- {sev} ({len(items)}) ---") for f in items: lines.append(f" {icon} [{f['section']}] {f['title']}") if f["detail"]: lines.append(f" {f['detail']}") lines.append("") return "\n".join(lines) def main(): parser = argparse.ArgumentParser(description="Proactive Health Scanner") parser.add_argument("--json", action="store_true", help="Output JSON") parser.add_argument("--section", type=str, help="Comma-separated sections: agents,resources,deps,config") args = parser.parse_args() sections = args.section.split(",") if args.section else None scanner = HealthScanner() report = scanner.run(sections) if args.json: print(json.dumps(report, indent=2)) else: print(format_human(report)) # Exit code: 2 for CRITICAL, 1 for WARN, 0 for INFO if report["overall"] == CRITICAL: sys.exit(2) elif report["overall"] == WARN: sys.exit(1) sys.exit(0) if __name__ == "__main__": main()