#!/usr/bin/env python3 """Needs System — Self-Monitoring & Self-Healing Loop. Monitors functional needs: context, health, energy, connection, growth. Each need has sensors, thresholds, self-heal actions, and escalation. Usage: cortex needs [--json] [--quiet] """ import argparse import json import subprocess import sys from dataclasses import asdict, dataclass, field from datetime import datetime from pathlib import Path from typing import Optional from cortex.config import cortex_home, memory_dir @dataclass class Need: name: str level: float # 0.0 (critical) to 1.0 (fully satisfied) status: str # "satisfied", "low", "critical" details: str healed: list = field(default_factory=list) escalate: list = field(default_factory=list) @dataclass class Wellbeing: timestamp: str overall: float status: str needs: dict healed: list escalations: list history_trend: str def _run_cmd(cmd, timeout=10): try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) return r.returncode, r.stdout, r.stderr except subprocess.TimeoutExpired: return -1, "", "timeout" except Exception as e: return -1, "", str(e) def _try_heal(action_name, cmd, timeout=30): try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) if r.returncode == 0: return True, f"✅ {action_name}: success" return False, f"❌ {action_name}: failed ({r.stderr[:80]})" except Exception as e: return False, f"❌ {action_name}: {e}" def _classify(score): return "satisfied" if score > 0.7 else "low" if score > 0.3 else "critical" def wellbeing_file(): return memory_dir() / "wellbeing.json" def sense_context(): score, details, healed, escalate = 1.0, [], [], [] mem = memory_dir() working = mem / "WORKING.md" if working.exists(): age_h = (datetime.now().timestamp() - working.stat().st_mtime) / 3600 if age_h > 8: score -= 0.3 details.append(f"WORKING.md stale ({age_h:.0f}h)") elif age_h > 4: score -= 0.1 details.append(f"WORKING.md somewhat old ({age_h:.1f}h)") else: score -= 0.4 details.append("WORKING.md missing") boot_ctx = mem / "BOOT_CONTEXT.md" if boot_ctx.exists(): age_h = (datetime.now().timestamp() - boot_ctx.stat().st_mtime) / 3600 if age_h > 4: score -= 0.15 details.append(f"BOOT_CONTEXT.md stale ({age_h:.0f}h)") else: score -= 0.2 details.append("BOOT_CONTEXT.md missing") home = cortex_home() memory_md = home.parent / "clawd" / "MEMORY.md" if "cortex" not in str(home) else home / "MEMORY.md" # Try standard location for p in [Path.home() / "clawd" / "MEMORY.md", home / "MEMORY.md"]: if p.exists(): if p.stat().st_size < 100: score -= 0.3 details.append("MEMORY.md nearly empty") break else: score -= 0.3 details.append("MEMORY.md not found") if not details: details.append("Context complete and fresh") return Need("context", max(0.0, score), _classify(max(0.0, score)), "; ".join(details), healed, escalate) def sense_health(): score, details, healed, escalate = 1.0, [], [], [] rc, out, _ = _run_cmd(["systemctl", "--user", "is-active", "event-consumer"]) if out.strip() != "active": score -= 0.3 details.append("Event Consumer inactive") nats_bin = str(Path.home() / "bin" / "nats") rc, out, _ = _run_cmd([nats_bin, "server", "check", "connection"]) if rc != 0: score -= 0.4 details.append("NATS unreachable") escalate.append("NATS Server down") rc, out, _ = _run_cmd(["df", "--output=pcent", "/"]) if rc == 0: lines = out.strip().split('\n') if len(lines) >= 2: usage = int(lines[1].strip().rstrip('%')) if usage > 90: score -= 0.3 details.append(f"Disk {usage}% full") elif usage > 80: score -= 0.1 details.append(f"Disk {usage}% full") if not details: details.append("All systems healthy") return Need("health", max(0.0, score), _classify(max(0.0, score)), "; ".join(details), healed, escalate) def sense_energy(): score, details = 1.0, [] total_ctx_bytes = 0 clawd = Path.home() / "clawd" for f in ["SOUL.md", "AGENTS.md", "TOOLS.md", "MEMORY.md", "USER.md"]: p = clawd / f if p.exists(): total_ctx_bytes += p.stat().st_size total_ctx_kb = total_ctx_bytes / 1024 if total_ctx_kb > 30: score -= 0.2 details.append(f"Workspace files {total_ctx_kb:.0f}KB — context pressure") else: details.append(f"Workspace files {total_ctx_kb:.0f}KB — efficient") if not details: details.append("Energy budget good") return Need("energy", max(0.0, score), _classify(max(0.0, score)), "; ".join(details), [], []) def sense_connection(): score, details = 1.0, [] hour = datetime.now().hour if 23 <= hour or hour < 8: details.append("Night mode — no interaction expected") else: details.append("Connection status normal") return Need("connection", max(0.0, score), _classify(max(0.0, score)), "; ".join(details), [], []) def sense_growth(): score, details = 1.0, [] rag_db = Path.home() / "clawd" / ".rag-db" / "chroma.sqlite3" if rag_db.exists(): age_days = (datetime.now().timestamp() - rag_db.stat().st_mtime) / 86400 if age_days > 14: score -= 0.3 details.append(f"RAG Index {age_days:.0f} days old") elif age_days > 7: score -= 0.1 details.append(f"RAG Index {age_days:.0f} days old") else: details.append(f"RAG Index fresh ({age_days:.1f} days)") else: score -= 0.3 details.append("RAG DB missing") if not details: details.append("Growth normal") return Need("growth", max(0.0, score), _classify(max(0.0, score)), "; ".join(details), [], []) def assess_wellbeing() -> Wellbeing: needs = {} all_healed, all_escalations = [], [] for sensor in [sense_context, sense_health, sense_energy, sense_connection, sense_growth]: try: need = sensor() needs[need.name] = need all_healed.extend(need.healed) all_escalations.extend(need.escalate) except Exception as e: name = sensor.__name__.replace("sense_", "") needs[name] = Need(name, 0.5, "unknown", f"Sensor error: {e}") weights = {"context": 0.3, "health": 0.3, "energy": 0.15, "connection": 0.1, "growth": 0.15} overall = sum(needs[n].level * weights.get(n, 0.2) for n in needs) / sum(weights.get(n, 0.2) for n in needs) if overall > 0.8: status = "thriving" elif overall > 0.6: status = "okay" elif overall > 0.3: status = "struggling" else: status = "critical" trend = _compute_trend(overall) return Wellbeing( timestamp=datetime.now().isoformat(), overall=round(overall, 2), status=status, needs={n: asdict(need) for n, need in needs.items()}, healed=[h for h in all_healed if h], escalations=all_escalations, history_trend=trend, ) def _compute_trend(current): try: wf = wellbeing_file() if wf.exists(): history = json.loads(wf.read_text()) past = [h.get("overall", 0.5) for h in history.get("history", [])] if past: avg = sum(past[-5:]) / len(past[-5:]) if current > avg + 0.1: return "improving" elif current < avg - 0.1: return "declining" return "stable" except Exception: return "unknown" def save_wellbeing(wb: Wellbeing): data = asdict(wb) wf = wellbeing_file() wf.parent.mkdir(parents=True, exist_ok=True) history = [] if wf.exists(): try: history = json.loads(wf.read_text()).get("history", []) except Exception: pass history.append({"timestamp": wb.timestamp, "overall": wb.overall, "status": wb.status}) data["history"] = history[-48:] wf.write_text(json.dumps(data, indent=2, ensure_ascii=False)) def format_status(wb: Wellbeing) -> str: emoji = {"thriving": "🌟", "okay": "😊", "struggling": "😟", "critical": "🆘"}.get(wb.status, "❓") trend_emoji = {"improving": "📈", "stable": "➡️", "declining": "📉"}.get(wb.history_trend, "❓") lines = [f"{emoji} Wellbeing: {wb.overall:.0%} ({wb.status}) {trend_emoji} {wb.history_trend}", ""] need_emoji = {"context": "🧠", "health": "💊", "energy": "⚡", "connection": "💬", "growth": "🌱"} for name, data in wb.needs.items(): ne = need_emoji.get(name, "•") bar = "█" * int(data["level"] * 10) + "░" * (10 - int(data["level"] * 10)) lines.append(f" {ne} {name:12s} [{bar}] {data['level']:.0%} — {data['details']}") if wb.healed: lines.extend(["", "🔧 Self-Healed:"] + [f" {h}" for h in wb.healed]) if wb.escalations: lines.extend(["", "⚠️ Need attention:"] + [f" → {e}" for e in wb.escalations]) return "\n".join(lines) def main(): parser = argparse.ArgumentParser(description="Needs System — Self-Monitoring") parser.add_argument("--json", action="store_true") parser.add_argument("--quiet", "-q", action="store_true") args = parser.parse_args() wb = assess_wellbeing() save_wellbeing(wb) if args.json: print(json.dumps(asdict(wb), indent=2, ensure_ascii=False)) elif args.quiet: if wb.status != "thriving" or wb.escalations: print(format_status(wb)) else: print(format_status(wb)) if __name__ == "__main__": main()