327 lines
10 KiB
Python
327 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Needs System — Self-Monitoring & Self-Healing Loop.
|
|
|
|
Monitors functional needs: context, health, energy, connection, growth.
|
|
Each need has sensors, thresholds, self-heal actions, and escalation.
|
|
|
|
Usage:
|
|
cortex needs [--json] [--quiet]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import asdict, dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from cortex.config import cortex_home, memory_dir
|
|
|
|
|
|
@dataclass
|
|
class Need:
|
|
name: str
|
|
level: float # 0.0 (critical) to 1.0 (fully satisfied)
|
|
status: str # "satisfied", "low", "critical"
|
|
details: str
|
|
healed: list = field(default_factory=list)
|
|
escalate: list = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class Wellbeing:
|
|
timestamp: str
|
|
overall: float
|
|
status: str
|
|
needs: dict
|
|
healed: list
|
|
escalations: list
|
|
history_trend: str
|
|
|
|
|
|
def _run_cmd(cmd, timeout=10):
|
|
try:
|
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
|
return r.returncode, r.stdout, r.stderr
|
|
except subprocess.TimeoutExpired:
|
|
return -1, "", "timeout"
|
|
except Exception as e:
|
|
return -1, "", str(e)
|
|
|
|
|
|
def _try_heal(action_name, cmd, timeout=30):
|
|
try:
|
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
|
|
if r.returncode == 0:
|
|
return True, f"✅ {action_name}: success"
|
|
return False, f"❌ {action_name}: failed ({r.stderr[:80]})"
|
|
except Exception as e:
|
|
return False, f"❌ {action_name}: {e}"
|
|
|
|
|
|
def _classify(score):
|
|
return "satisfied" if score > 0.7 else "low" if score > 0.3 else "critical"
|
|
|
|
|
|
def wellbeing_file():
|
|
return memory_dir() / "wellbeing.json"
|
|
|
|
|
|
def sense_context():
|
|
score, details, healed, escalate = 1.0, [], [], []
|
|
mem = memory_dir()
|
|
|
|
working = mem / "WORKING.md"
|
|
if working.exists():
|
|
age_h = (datetime.now().timestamp() - working.stat().st_mtime) / 3600
|
|
if age_h > 8:
|
|
score -= 0.3
|
|
details.append(f"WORKING.md stale ({age_h:.0f}h)")
|
|
elif age_h > 4:
|
|
score -= 0.1
|
|
details.append(f"WORKING.md somewhat old ({age_h:.1f}h)")
|
|
else:
|
|
score -= 0.4
|
|
details.append("WORKING.md missing")
|
|
|
|
boot_ctx = mem / "BOOT_CONTEXT.md"
|
|
if boot_ctx.exists():
|
|
age_h = (datetime.now().timestamp() - boot_ctx.stat().st_mtime) / 3600
|
|
if age_h > 4:
|
|
score -= 0.15
|
|
details.append(f"BOOT_CONTEXT.md stale ({age_h:.0f}h)")
|
|
else:
|
|
score -= 0.2
|
|
details.append("BOOT_CONTEXT.md missing")
|
|
|
|
home = cortex_home()
|
|
memory_md = Path(os.environ.get("DARKPLEX_WORKSPACE", str(Path.home()))) / "MEMORY.md" if "cortex" not in str(home) else home / "MEMORY.md"
|
|
# Try standard location
|
|
for p in [Path(os.environ.get('DARKPLEX_WORKSPACE', os.environ.get('OPENCLAW_WORKSPACE', str(Path.home())))) / "MEMORY.md", home / "MEMORY.md"]:
|
|
if p.exists():
|
|
if p.stat().st_size < 100:
|
|
score -= 0.3
|
|
details.append("MEMORY.md nearly empty")
|
|
break
|
|
else:
|
|
score -= 0.3
|
|
details.append("MEMORY.md not found")
|
|
|
|
if not details:
|
|
details.append("Context complete and fresh")
|
|
|
|
return Need("context", max(0.0, score), _classify(max(0.0, score)),
|
|
"; ".join(details), healed, escalate)
|
|
|
|
|
|
def sense_health():
|
|
score, details, healed, escalate = 1.0, [], [], []
|
|
|
|
rc, out, _ = _run_cmd(["systemctl", "--user", "is-active", os.environ.get("DARKPLEX_EVENT_SERVICE", "event-consumer")])
|
|
if out.strip() != "active":
|
|
score -= 0.3
|
|
details.append("Event Consumer inactive")
|
|
|
|
nats_bin = str(Path.home() / "bin" / "nats")
|
|
rc, out, _ = _run_cmd([nats_bin, "server", "check", "connection"])
|
|
if rc != 0:
|
|
score -= 0.4
|
|
details.append("NATS unreachable")
|
|
escalate.append("NATS Server down")
|
|
|
|
rc, out, _ = _run_cmd(["df", "--output=pcent", "/"])
|
|
if rc == 0:
|
|
lines = out.strip().split('\n')
|
|
if len(lines) >= 2:
|
|
usage = int(lines[1].strip().rstrip('%'))
|
|
if usage > 90:
|
|
score -= 0.3
|
|
details.append(f"Disk {usage}% full")
|
|
elif usage > 80:
|
|
score -= 0.1
|
|
details.append(f"Disk {usage}% full")
|
|
|
|
if not details:
|
|
details.append("All systems healthy")
|
|
|
|
return Need("health", max(0.0, score), _classify(max(0.0, score)),
|
|
"; ".join(details), healed, escalate)
|
|
|
|
|
|
def sense_energy():
|
|
score, details = 1.0, []
|
|
|
|
total_ctx_bytes = 0
|
|
clawd = Path(os.environ.get('DARKPLEX_WORKSPACE', os.environ.get('OPENCLAW_WORKSPACE', str(Path.home()))))
|
|
for f in ["SOUL.md", "AGENTS.md", "TOOLS.md", "MEMORY.md", "USER.md"]:
|
|
p = clawd / f
|
|
if p.exists():
|
|
total_ctx_bytes += p.stat().st_size
|
|
|
|
total_ctx_kb = total_ctx_bytes / 1024
|
|
if total_ctx_kb > 30:
|
|
score -= 0.2
|
|
details.append(f"Workspace files {total_ctx_kb:.0f}KB — context pressure")
|
|
else:
|
|
details.append(f"Workspace files {total_ctx_kb:.0f}KB — efficient")
|
|
|
|
if not details:
|
|
details.append("Energy budget good")
|
|
|
|
return Need("energy", max(0.0, score), _classify(max(0.0, score)),
|
|
"; ".join(details), [], [])
|
|
|
|
|
|
def sense_connection():
|
|
score, details = 1.0, []
|
|
hour = datetime.now().hour
|
|
if 23 <= hour or hour < 8:
|
|
details.append("Night mode — no interaction expected")
|
|
else:
|
|
details.append("Connection status normal")
|
|
|
|
return Need("connection", max(0.0, score), _classify(max(0.0, score)),
|
|
"; ".join(details), [], [])
|
|
|
|
|
|
def sense_growth():
|
|
score, details = 1.0, []
|
|
|
|
rag_db = Path(os.environ.get('DARKPLEX_WORKSPACE', os.environ.get('OPENCLAW_WORKSPACE', str(Path.home())))) / ".rag-db" / "chroma.sqlite3"
|
|
if rag_db.exists():
|
|
age_days = (datetime.now().timestamp() - rag_db.stat().st_mtime) / 86400
|
|
if age_days > 14:
|
|
score -= 0.3
|
|
details.append(f"RAG Index {age_days:.0f} days old")
|
|
elif age_days > 7:
|
|
score -= 0.1
|
|
details.append(f"RAG Index {age_days:.0f} days old")
|
|
else:
|
|
details.append(f"RAG Index fresh ({age_days:.1f} days)")
|
|
else:
|
|
score -= 0.3
|
|
details.append("RAG DB missing")
|
|
|
|
if not details:
|
|
details.append("Growth normal")
|
|
|
|
return Need("growth", max(0.0, score), _classify(max(0.0, score)),
|
|
"; ".join(details), [], [])
|
|
|
|
|
|
def assess_wellbeing() -> Wellbeing:
|
|
needs = {}
|
|
all_healed, all_escalations = [], []
|
|
|
|
for sensor in [sense_context, sense_health, sense_energy, sense_connection, sense_growth]:
|
|
try:
|
|
need = sensor()
|
|
needs[need.name] = need
|
|
all_healed.extend(need.healed)
|
|
all_escalations.extend(need.escalate)
|
|
except Exception as e:
|
|
name = sensor.__name__.replace("sense_", "")
|
|
needs[name] = Need(name, 0.5, "unknown", f"Sensor error: {e}")
|
|
|
|
weights = {"context": 0.3, "health": 0.3, "energy": 0.15, "connection": 0.1, "growth": 0.15}
|
|
overall = sum(needs[n].level * weights.get(n, 0.2) for n in needs) / sum(weights.get(n, 0.2) for n in needs)
|
|
|
|
if overall > 0.8:
|
|
status = "thriving"
|
|
elif overall > 0.6:
|
|
status = "okay"
|
|
elif overall > 0.3:
|
|
status = "struggling"
|
|
else:
|
|
status = "critical"
|
|
|
|
trend = _compute_trend(overall)
|
|
|
|
return Wellbeing(
|
|
timestamp=datetime.now().isoformat(),
|
|
overall=round(overall, 2),
|
|
status=status,
|
|
needs={n: asdict(need) for n, need in needs.items()},
|
|
healed=[h for h in all_healed if h],
|
|
escalations=all_escalations,
|
|
history_trend=trend,
|
|
)
|
|
|
|
|
|
def _compute_trend(current):
|
|
try:
|
|
wf = wellbeing_file()
|
|
if wf.exists():
|
|
history = json.loads(wf.read_text())
|
|
past = [h.get("overall", 0.5) for h in history.get("history", [])]
|
|
if past:
|
|
avg = sum(past[-5:]) / len(past[-5:])
|
|
if current > avg + 0.1:
|
|
return "improving"
|
|
elif current < avg - 0.1:
|
|
return "declining"
|
|
return "stable"
|
|
except Exception:
|
|
return "unknown"
|
|
|
|
|
|
def save_wellbeing(wb: Wellbeing):
|
|
data = asdict(wb)
|
|
wf = wellbeing_file()
|
|
wf.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
history = []
|
|
if wf.exists():
|
|
try:
|
|
history = json.loads(wf.read_text()).get("history", [])
|
|
except Exception:
|
|
pass
|
|
|
|
history.append({"timestamp": wb.timestamp, "overall": wb.overall, "status": wb.status})
|
|
data["history"] = history[-48:]
|
|
wf.write_text(json.dumps(data, indent=2, ensure_ascii=False))
|
|
|
|
|
|
def format_status(wb: Wellbeing) -> str:
|
|
emoji = {"thriving": "🌟", "okay": "😊", "struggling": "😟", "critical": "🆘"}.get(wb.status, "❓")
|
|
trend_emoji = {"improving": "📈", "stable": "➡️", "declining": "📉"}.get(wb.history_trend, "❓")
|
|
|
|
lines = [f"{emoji} Wellbeing: {wb.overall:.0%} ({wb.status}) {trend_emoji} {wb.history_trend}", ""]
|
|
|
|
need_emoji = {"context": "🧠", "health": "💊", "energy": "⚡", "connection": "💬", "growth": "🌱"}
|
|
for name, data in wb.needs.items():
|
|
ne = need_emoji.get(name, "•")
|
|
bar = "█" * int(data["level"] * 10) + "░" * (10 - int(data["level"] * 10))
|
|
lines.append(f" {ne} {name:12s} [{bar}] {data['level']:.0%} — {data['details']}")
|
|
|
|
if wb.healed:
|
|
lines.extend(["", "🔧 Self-Healed:"] + [f" {h}" for h in wb.healed])
|
|
if wb.escalations:
|
|
lines.extend(["", "⚠️ Need attention:"] + [f" → {e}" for e in wb.escalations])
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Needs System — Self-Monitoring")
|
|
parser.add_argument("--json", action="store_true")
|
|
parser.add_argument("--quiet", "-q", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
wb = assess_wellbeing()
|
|
save_wellbeing(wb)
|
|
|
|
if args.json:
|
|
print(json.dumps(asdict(wb), indent=2, ensure_ascii=False))
|
|
elif args.quiet:
|
|
if wb.status != "thriving" or wb.escalations:
|
|
print(format_status(wb))
|
|
else:
|
|
print(format_status(wb))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|