darkplex-core/cortex/needs.py
Claudia fda607c204
All checks were successful
Tests / test (push) Successful in 4s
fix: sync missing import os + stray } from darkplex-core PR #2 (YesMan)
2026-02-11 20:25:29 +01:00

327 lines
10 KiB
Python

#!/usr/bin/env python3
"""Needs System — Self-Monitoring & Self-Healing Loop.
Monitors functional needs: context, health, energy, connection, growth.
Each need has sensors, thresholds, self-heal actions, and escalation.
Usage:
cortex needs [--json] [--quiet]
"""
import argparse
import json
import os
import subprocess
import sys
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
from cortex.config import cortex_home, memory_dir
@dataclass
class Need:
name: str
level: float # 0.0 (critical) to 1.0 (fully satisfied)
status: str # "satisfied", "low", "critical"
details: str
healed: list = field(default_factory=list)
escalate: list = field(default_factory=list)
@dataclass
class Wellbeing:
timestamp: str
overall: float
status: str
needs: dict
healed: list
escalations: list
history_trend: str
def _run_cmd(cmd, timeout=10):
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return r.returncode, r.stdout, r.stderr
except subprocess.TimeoutExpired:
return -1, "", "timeout"
except Exception as e:
return -1, "", str(e)
def _try_heal(action_name, cmd, timeout=30):
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode == 0:
return True, f"{action_name}: success"
return False, f"{action_name}: failed ({r.stderr[:80]})"
except Exception as e:
return False, f"{action_name}: {e}"
def _classify(score):
return "satisfied" if score > 0.7 else "low" if score > 0.3 else "critical"
def wellbeing_file():
return memory_dir() / "wellbeing.json"
def sense_context():
score, details, healed, escalate = 1.0, [], [], []
mem = memory_dir()
working = mem / "WORKING.md"
if working.exists():
age_h = (datetime.now().timestamp() - working.stat().st_mtime) / 3600
if age_h > 8:
score -= 0.3
details.append(f"WORKING.md stale ({age_h:.0f}h)")
elif age_h > 4:
score -= 0.1
details.append(f"WORKING.md somewhat old ({age_h:.1f}h)")
else:
score -= 0.4
details.append("WORKING.md missing")
boot_ctx = mem / "BOOT_CONTEXT.md"
if boot_ctx.exists():
age_h = (datetime.now().timestamp() - boot_ctx.stat().st_mtime) / 3600
if age_h > 4:
score -= 0.15
details.append(f"BOOT_CONTEXT.md stale ({age_h:.0f}h)")
else:
score -= 0.2
details.append("BOOT_CONTEXT.md missing")
home = cortex_home()
memory_md = Path(os.environ.get("DARKPLEX_WORKSPACE", str(Path.home()))) / "MEMORY.md" if "cortex" not in str(home) else home / "MEMORY.md"
# Try standard location
for p in [Path(os.environ.get('DARKPLEX_WORKSPACE', os.environ.get('OPENCLAW_WORKSPACE', str(Path.home())))) / "MEMORY.md", home / "MEMORY.md"]:
if p.exists():
if p.stat().st_size < 100:
score -= 0.3
details.append("MEMORY.md nearly empty")
break
else:
score -= 0.3
details.append("MEMORY.md not found")
if not details:
details.append("Context complete and fresh")
return Need("context", max(0.0, score), _classify(max(0.0, score)),
"; ".join(details), healed, escalate)
def sense_health():
score, details, healed, escalate = 1.0, [], [], []
rc, out, _ = _run_cmd(["systemctl", "--user", "is-active", os.environ.get("DARKPLEX_EVENT_SERVICE", "event-consumer")])
if out.strip() != "active":
score -= 0.3
details.append("Event Consumer inactive")
nats_bin = str(Path.home() / "bin" / "nats")
rc, out, _ = _run_cmd([nats_bin, "server", "check", "connection"])
if rc != 0:
score -= 0.4
details.append("NATS unreachable")
escalate.append("NATS Server down")
rc, out, _ = _run_cmd(["df", "--output=pcent", "/"])
if rc == 0:
lines = out.strip().split('\n')
if len(lines) >= 2:
usage = int(lines[1].strip().rstrip('%'))
if usage > 90:
score -= 0.3
details.append(f"Disk {usage}% full")
elif usage > 80:
score -= 0.1
details.append(f"Disk {usage}% full")
if not details:
details.append("All systems healthy")
return Need("health", max(0.0, score), _classify(max(0.0, score)),
"; ".join(details), healed, escalate)
def sense_energy():
score, details = 1.0, []
total_ctx_bytes = 0
clawd = Path(os.environ.get('DARKPLEX_WORKSPACE', os.environ.get('OPENCLAW_WORKSPACE', str(Path.home()))))
for f in ["SOUL.md", "AGENTS.md", "TOOLS.md", "MEMORY.md", "USER.md"]:
p = clawd / f
if p.exists():
total_ctx_bytes += p.stat().st_size
total_ctx_kb = total_ctx_bytes / 1024
if total_ctx_kb > 30:
score -= 0.2
details.append(f"Workspace files {total_ctx_kb:.0f}KB — context pressure")
else:
details.append(f"Workspace files {total_ctx_kb:.0f}KB — efficient")
if not details:
details.append("Energy budget good")
return Need("energy", max(0.0, score), _classify(max(0.0, score)),
"; ".join(details), [], [])
def sense_connection():
score, details = 1.0, []
hour = datetime.now().hour
if 23 <= hour or hour < 8:
details.append("Night mode — no interaction expected")
else:
details.append("Connection status normal")
return Need("connection", max(0.0, score), _classify(max(0.0, score)),
"; ".join(details), [], [])
def sense_growth():
score, details = 1.0, []
rag_db = Path(os.environ.get('DARKPLEX_WORKSPACE', os.environ.get('OPENCLAW_WORKSPACE', str(Path.home())))) / ".rag-db" / "chroma.sqlite3"
if rag_db.exists():
age_days = (datetime.now().timestamp() - rag_db.stat().st_mtime) / 86400
if age_days > 14:
score -= 0.3
details.append(f"RAG Index {age_days:.0f} days old")
elif age_days > 7:
score -= 0.1
details.append(f"RAG Index {age_days:.0f} days old")
else:
details.append(f"RAG Index fresh ({age_days:.1f} days)")
else:
score -= 0.3
details.append("RAG DB missing")
if not details:
details.append("Growth normal")
return Need("growth", max(0.0, score), _classify(max(0.0, score)),
"; ".join(details), [], [])
def assess_wellbeing() -> Wellbeing:
needs = {}
all_healed, all_escalations = [], []
for sensor in [sense_context, sense_health, sense_energy, sense_connection, sense_growth]:
try:
need = sensor()
needs[need.name] = need
all_healed.extend(need.healed)
all_escalations.extend(need.escalate)
except Exception as e:
name = sensor.__name__.replace("sense_", "")
needs[name] = Need(name, 0.5, "unknown", f"Sensor error: {e}")
weights = {"context": 0.3, "health": 0.3, "energy": 0.15, "connection": 0.1, "growth": 0.15}
overall = sum(needs[n].level * weights.get(n, 0.2) for n in needs) / sum(weights.get(n, 0.2) for n in needs)
if overall > 0.8:
status = "thriving"
elif overall > 0.6:
status = "okay"
elif overall > 0.3:
status = "struggling"
else:
status = "critical"
trend = _compute_trend(overall)
return Wellbeing(
timestamp=datetime.now().isoformat(),
overall=round(overall, 2),
status=status,
needs={n: asdict(need) for n, need in needs.items()},
healed=[h for h in all_healed if h],
escalations=all_escalations,
history_trend=trend,
)
def _compute_trend(current):
try:
wf = wellbeing_file()
if wf.exists():
history = json.loads(wf.read_text())
past = [h.get("overall", 0.5) for h in history.get("history", [])]
if past:
avg = sum(past[-5:]) / len(past[-5:])
if current > avg + 0.1:
return "improving"
elif current < avg - 0.1:
return "declining"
return "stable"
except Exception:
return "unknown"
def save_wellbeing(wb: Wellbeing):
data = asdict(wb)
wf = wellbeing_file()
wf.parent.mkdir(parents=True, exist_ok=True)
history = []
if wf.exists():
try:
history = json.loads(wf.read_text()).get("history", [])
except Exception:
pass
history.append({"timestamp": wb.timestamp, "overall": wb.overall, "status": wb.status})
data["history"] = history[-48:]
wf.write_text(json.dumps(data, indent=2, ensure_ascii=False))
def format_status(wb: Wellbeing) -> str:
emoji = {"thriving": "🌟", "okay": "😊", "struggling": "😟", "critical": "🆘"}.get(wb.status, "")
trend_emoji = {"improving": "📈", "stable": "➡️", "declining": "📉"}.get(wb.history_trend, "")
lines = [f"{emoji} Wellbeing: {wb.overall:.0%} ({wb.status}) {trend_emoji} {wb.history_trend}", ""]
need_emoji = {"context": "🧠", "health": "💊", "energy": "", "connection": "💬", "growth": "🌱"}
for name, data in wb.needs.items():
ne = need_emoji.get(name, "")
bar = "" * int(data["level"] * 10) + "" * (10 - int(data["level"] * 10))
lines.append(f" {ne} {name:12s} [{bar}] {data['level']:.0%}{data['details']}")
if wb.healed:
lines.extend(["", "🔧 Self-Healed:"] + [f" {h}" for h in wb.healed])
if wb.escalations:
lines.extend(["", "⚠️ Need attention:"] + [f"{e}" for e in wb.escalations])
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Needs System — Self-Monitoring")
parser.add_argument("--json", action="store_true")
parser.add_argument("--quiet", "-q", action="store_true")
args = parser.parse_args()
wb = assess_wellbeing()
save_wellbeing(wb)
if args.json:
print(json.dumps(asdict(wb), indent=2, ensure_ascii=False))
elif args.quiet:
if wb.status != "thriving" or wb.escalations:
print(format_status(wb))
else:
print(format_status(wb))
if __name__ == "__main__":
main()