From 0484c6321a5b2ca08e544820f3a272010c683d5a Mon Sep 17 00:00:00 2001 From: Claudia Date: Fri, 13 Feb 2026 11:52:25 +0100 Subject: [PATCH] feat(memory): add session memory persistence module New cortex/memory/ module that provides: - boot_assembler: builds BOOTSTRAP.md from threads, decisions, narrative - thread_tracker: tracks conversation threads across sessions via NATS - narrative_generator: daily narrative with Ollama LLM (fallback: structured) - pre_compaction: snapshot pipeline before context compaction CLI commands: - cortex memory bootstrap [--dry-run] [--workspace DIR] - cortex memory snapshot [--workspace DIR] - cortex memory threads [--summary] [--hours N] All paths configurable via WORKSPACE_DIR, NATS_URL, AGENT_NAME env vars. No hardcoded paths. Works with any OpenClaw agent. Fixes array/dict handling for empty threads.json and decisions.json. --- cortex/cli.py | 26 +++ cortex/intelligence/loop.py | 2 +- cortex/memory/__init__.py | 1 + cortex/memory/boot_assembler.py | 290 +++++++++++++++++++++++++++ cortex/memory/common.py | 122 +++++++++++ cortex/memory/narrative_generator.py | 171 ++++++++++++++++ cortex/memory/pre_compaction.py | 110 ++++++++++ cortex/memory/thread_tracker.py | 267 ++++++++++++++++++++++++ 8 files changed, 988 insertions(+), 1 deletion(-) create mode 100644 cortex/memory/__init__.py create mode 100644 cortex/memory/boot_assembler.py create mode 100644 cortex/memory/common.py create mode 100644 cortex/memory/narrative_generator.py create mode 100644 cortex/memory/pre_compaction.py create mode 100644 cortex/memory/thread_tracker.py diff --git a/cortex/cli.py b/cortex/cli.py index f7cd196..f96dabd 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -22,6 +22,9 @@ Usage: cortex anomaly [--hours N] [--json] cortex predict [--learn] [--patterns] cortex monitor [--json] + cortex memory bootstrap [--dry-run] [--max-tokens N] + cortex memory snapshot [--dry-run] + cortex memory threads [--summary] [--hours N] cortex version """ @@ -121,6 +124,29 @@ def main(): from cortex.monitor import main as monitor_main monitor_main() + elif cmd == "memory": + if len(sys.argv) < 2: + print("Usage: cortex memory bootstrap|snapshot|threads [options]") + sys.exit(1) + subcmd = sys.argv[1] if len(sys.argv) > 1 else "" + sys.argv = [f"cortex memory {subcmd}"] + sys.argv[2:] + if subcmd == "bootstrap": + from cortex.memory.boot_assembler import main as boot_main + boot_main() + elif subcmd == "snapshot": + from cortex.memory.pre_compaction import main as snap_main + snap_main() + elif subcmd == "threads": + # Default to --show mode for display + if "--hours" not in sys.argv and "--dry-run" not in sys.argv: + sys.argv.append("--show") + from cortex.memory.thread_tracker import main as thread_main + thread_main() + else: + print(f"Unknown memory subcommand: {subcmd}") + print("Available: bootstrap, snapshot, threads") + sys.exit(1) + elif cmd in ("-h", "--help", "help"): print(__doc__.strip()) diff --git a/cortex/intelligence/loop.py b/cortex/intelligence/loop.py index 98930af..b463667 100644 --- a/cortex/intelligence/loop.py +++ b/cortex/intelligence/loop.py @@ -309,7 +309,7 @@ def step_extract(state: LoopState, events: list) -> dict: spec.loader.exec_module(em) # Try LLM batch extraction first - from llm_extractor import extract_entities_llm_batch, is_available as llm_available + from cortex.llm_extractor import extract_entities_llm_batch, is_available as llm_available use_llm = os.environ.get("DARKPLEX_EXTRACTOR", "auto").lower() in ("llm", "auto") llm_ok = use_llm and llm_available() if llm_ok: diff --git a/cortex/memory/__init__.py b/cortex/memory/__init__.py new file mode 100644 index 0000000..268ed01 --- /dev/null +++ b/cortex/memory/__init__.py @@ -0,0 +1 @@ +"""Memory module — session memory persistence for OpenClaw agents.""" diff --git a/cortex/memory/boot_assembler.py b/cortex/memory/boot_assembler.py new file mode 100644 index 0000000..fadb6ff --- /dev/null +++ b/cortex/memory/boot_assembler.py @@ -0,0 +1,290 @@ +"""Boot Assembler — Query-driven boot context generator. + +Reads threads, decisions, narrative, and knowledge to assemble a dense BOOTSTRAP.md. +All paths derived from WORKSPACE_DIR. No hardcoded paths. +""" + +import json +import os +import subprocess +from datetime import datetime, timezone, timedelta +from pathlib import Path + +from .common import ( + get_workspace_dir, get_reboot_dir, get_agent_name, + load_json, load_facts, +) + +DEFAULT_MAX_CHARS = 16000 # ~4000 tokens + + +def _load_threads_data(reboot_dir: Path) -> dict: + data = load_json(reboot_dir / "threads.json") + if isinstance(data, list): + return {"threads": data} + return data if isinstance(data, dict) else {} + + +def _get_open_threads(reboot_dir: Path, limit: int = 7) -> list[dict]: + data = _load_threads_data(reboot_dir) + threads = [t for t in data.get("threads", []) if t.get("status") == "open"] + priority_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} + threads.sort(key=lambda t: ( + priority_order.get(t.get("priority", "low"), 3), + -(datetime.fromisoformat( + t.get("last_activity", "2000-01-01T00:00:00Z").replace("Z", "+00:00") + ).timestamp()) + )) + return threads[:limit] + + +def _integrity_warning(reboot_dir: Path) -> str: + data = _load_threads_data(reboot_dir) + integrity = data.get("integrity", {}) + last_ts = integrity.get("last_nats_timestamp") + if not last_ts: + return "⚠️ No integrity data — thread tracker may not have run yet." + try: + if len(last_ts) <= 10: + last_dt = datetime.strptime(last_ts, "%Y-%m-%d").replace(tzinfo=timezone.utc) + else: + last_dt = datetime.fromisoformat(last_ts.replace("Z", "+00:00")) + age_min = (datetime.now(timezone.utc) - last_dt).total_seconds() / 60 + if age_min > 480: + return f"🚨 STALE DATA: Thread data is {age_min/60:.0f}h old." + elif age_min > 120: + return f"⚠️ Data staleness: Thread data is {age_min/60:.0f}h old." + return "" + except Exception: + return "⚠️ Could not parse integrity timestamp." + + +def _load_hot_snapshot(reboot_dir: Path) -> str: + f = reboot_dir / "hot-snapshot.md" + try: + if not f.exists(): + return "" + mtime = datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc) + if datetime.now(timezone.utc) - mtime > timedelta(hours=1): + return "" + return f.read_text().strip()[:1000] + except Exception: + return "" + + +def _load_decisions(reboot_dir: Path) -> list[dict]: + data = load_json(reboot_dir / "decisions.json") + if isinstance(data, list): + data = {"decisions": data} + if not isinstance(data, dict): + return [] + cutoff = (datetime.now(timezone.utc) - timedelta(days=14)).strftime("%Y-%m-%d") + recent = [d for d in data.get("decisions", []) if d.get("date", "") >= cutoff] + return recent[-10:] + + +def _load_narrative(reboot_dir: Path) -> str: + f = reboot_dir / "narrative.md" + try: + if not f.exists(): + return "" + mtime = datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc) + if datetime.now(timezone.utc) - mtime > timedelta(hours=36): + return "" + return f.read_text().strip()[:2000] + except Exception: + return "" + + +def _query_knowledge_for_thread(thread: dict, facts: list[dict]) -> list[str]: + """Score facts by keyword overlap with thread.""" + results = [] + query_terms = thread.get("title", "") + " " + thread.get("summary", "") + query_words = set(query_terms.lower().split()) + + scored = [] + for fact in facts: + text = fact.get("text", "").lower() + priority = fact.get("priority", "normal") + boost = {"critical": 4, "high": 2, "normal": 1, "low": 0.5}.get(priority, 1) + overlap = len(query_words & set(text.split())) + if overlap > 0: + score = overlap * boost + created = fact.get("created", "") + if created and created[:10] >= (datetime.now(timezone.utc) - timedelta(days=7)).strftime("%Y-%m-%d"): + score *= 1.5 + scored.append((score, fact)) + + scored.sort(key=lambda x: -x[0]) + for score, fact in scored[:3]: + prio = fact.get("priority", "?") + conf = fact.get("confidence", 1.0) + text = fact.get("text", "")[:150] + results.append(f" [{prio}|{conf:.0%}] {text}") + + return results[:5] + + +def _get_execution_mode() -> str: + hour = datetime.now().hour + if 6 <= hour < 12: + return "Morning — brief, directive, efficient" + elif 12 <= hour < 18: + return "Afternoon — execution mode" + elif 18 <= hour < 22: + return "Evening — strategic, philosophical possible" + return "Night — emergencies only" + + +def assemble(workspace: Path = None, max_chars: int = DEFAULT_MAX_CHARS, + facts_file: Path = None, calendar_cmd: list[str] = None, + wellbeing_file: Path = None) -> str: + """Assemble BOOTSTRAP.md content. + + Args: + workspace: Workspace directory (default: WORKSPACE_DIR or cwd) + max_chars: Character budget + facts_file: Path to facts.jsonl for knowledge queries + calendar_cmd: Command to run for calendar events (optional) + wellbeing_file: Path to wellbeing.json (optional) + """ + ws = workspace or get_workspace_dir() + reboot_dir = get_reboot_dir(ws) + agent = get_agent_name() + now = datetime.now(timezone.utc) + local_now = datetime.now() + parts = [] + + parts.append(f"# Context Briefing") + parts.append(f"Agent: {agent} | Generated: {now.isoformat()[:19]}Z | Local: {local_now.strftime('%H:%M')}") + parts.append("") + + # State + parts.append("## ⚡ State") + parts.append(f"Mode: {_get_execution_mode()}") + + # Wellbeing (optional) + if wellbeing_file: + wb = load_json(wellbeing_file) + if wb: + parts.append(f"Wellbeing: {wb.get('status', '?')} ({wb.get('overall', 0):.0%}) trend:{wb.get('history_trend', '?')}") + + # Session mood + td = _load_threads_data(reboot_dir) + mood = td.get("session_mood", "neutral") + if mood != "neutral": + emoji = {"frustrated": "😤", "excited": "🔥", "tense": "⚡", "productive": "🔧", "exploratory": "🔬"}.get(mood, "") + parts.append(f"Last session mood: {mood} {emoji}") + + warning = _integrity_warning(reboot_dir) + if warning: + parts.append(f"\n{warning}") + + # Calendar (optional external command) + if calendar_cmd: + try: + result = subprocess.run(calendar_cmd, capture_output=True, text=True, timeout=10) + if result.returncode == 0 and result.stdout.strip(): + parts.append(f"\n### 📅 Today") + parts.append("\n".join(result.stdout.strip().split("\n")[:10])) + except Exception: + pass + parts.append("") + + # Hot snapshot + hot = _load_hot_snapshot(reboot_dir) + if hot: + parts.append("## 🔥 Last Session Snapshot") + parts.append(hot) + parts.append("") + + # Narrative + narrative = _load_narrative(reboot_dir) + if narrative: + parts.append("## 📖 Narrative (last 24h)") + parts.append(narrative) + parts.append("") + + # Threads + knowledge + threads = _get_open_threads(reboot_dir) + all_facts = load_facts(facts_file) if facts_file and facts_file.exists() else [] + + if threads: + parts.append("## 🧵 Active Threads") + for t in threads: + prio_emoji = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🔵"}.get(t.get("priority"), "⚪") + mood_tag = f" [{t.get('mood', '')}]" if t.get("mood", "neutral") != "neutral" else "" + parts.append(f"\n### {prio_emoji} {t['title']}{mood_tag}") + parts.append(f"Priority: {t.get('priority', '?')} | Last: {t.get('last_activity', '?')[:16]}") + parts.append(f"Summary: {t.get('summary', 'no summary')}") + if t.get("waiting_for"): + parts.append(f"⏳ Waiting for: {t['waiting_for']}") + if t.get("decisions"): + parts.append(f"Decisions: {', '.join(t['decisions'])}") + if all_facts: + knowledge = _query_knowledge_for_thread(t, all_facts) + if knowledge: + parts.append("Knowledge:") + parts.extend(knowledge) + parts.append("") + + # Decisions + decisions = _load_decisions(reboot_dir) + if decisions: + parts.append("## 🎯 Recent Decisions") + for d in decisions: + ie = {"critical": "🔴", "high": "🟠", "medium": "🟡"}.get(d.get("impact"), "⚪") + parts.append(f"- {ie} **{d['what']}** ({d.get('date', '?')})") + if d.get("why"): + parts.append(f" Why: {d['why'][:100]}") + parts.append("") + + # Footer + parts.append("---") + parts.append(f"_Boot context | {len(threads)} active threads | {len(decisions)} recent decisions_") + + result = "\n".join(parts) + if len(result) > max_chars: + result = result[:max_chars] + "\n\n_[truncated to token budget]_" + return result + + +def run(dry_run: bool = False, max_tokens: int = 4000, workspace: Path = None, + facts_file: Path = None, calendar_cmd: list[str] = None, + wellbeing_file: Path = None): + """Run boot assembler.""" + ws = workspace or get_workspace_dir() + output_file = ws / "BOOTSTRAP.md" + + bootstrap = assemble( + workspace=ws, + max_chars=max_tokens * 4, + facts_file=facts_file, + calendar_cmd=calendar_cmd, + wellbeing_file=wellbeing_file, + ) + + if dry_run: + print(bootstrap) + print(f"\n--- Stats: {len(bootstrap)} chars, ~{len(bootstrap)//4} tokens ---") + else: + output_file.write_text(bootstrap) + print(f"✅ BOOTSTRAP.md written ({len(bootstrap)} chars, ~{len(bootstrap)//4} tokens)") + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Boot Assembler — Query-driven boot context") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--max-tokens", type=int, default=4000) + parser.add_argument("--workspace", type=str, help="Workspace directory") + parser.add_argument("--facts-file", type=str, help="Path to facts.jsonl") + args = parser.parse_args() + + ws = Path(args.workspace) if args.workspace else None + ff = Path(args.facts_file) if args.facts_file else None + run(dry_run=args.dry_run, max_tokens=args.max_tokens, workspace=ws, facts_file=ff) + + +if __name__ == "__main__": + main() diff --git a/cortex/memory/common.py b/cortex/memory/common.py new file mode 100644 index 0000000..cb39d18 --- /dev/null +++ b/cortex/memory/common.py @@ -0,0 +1,122 @@ +"""Shared utilities for the memory module — NATS access, path helpers, JSON loading.""" + +import json +import logging +import os +from datetime import datetime, timezone +from pathlib import Path + + +def get_workspace_dir() -> Path: + """Get workspace directory from WORKSPACE_DIR env or cwd.""" + return Path(os.environ.get("WORKSPACE_DIR", os.getcwd())) + + +def get_agent_name() -> str: + """Get agent name from AGENT_NAME env or 'agent'.""" + return os.environ.get("AGENT_NAME", "agent") + + +def get_reboot_dir(workspace: Path = None) -> Path: + """Get memory/reboot directory, creating if needed.""" + ws = workspace or get_workspace_dir() + d = ws / "memory" / "reboot" + d.mkdir(parents=True, exist_ok=True) + return d + + +def get_nats_credentials(workspace: Path = None) -> dict: + """Load NATS credentials from env vars or config file. + + Priority: env vars > config file at WORKSPACE_DIR/config/nats/credentials.env + Returns dict with keys: url, user, password + """ + url = os.environ.get("NATS_URL", "") + user = os.environ.get("NATS_USER", "") + password = os.environ.get("NATS_PASSWORD", "") + + if not (url and user and password): + ws = workspace or get_workspace_dir() + creds_file = ws / "config" / "nats" / "credentials.env" + if creds_file.exists(): + for line in creds_file.read_text().strip().split("\n"): + if "=" in line and not line.startswith("#"): + k, v = line.split("=", 1) + k, v = k.strip(), v.strip().strip('"') + if k == "NATS_URL" and not url: + url = v + elif k in ("NATS_USER",) and not user: + user = v + elif k in ("NATS_PASSWORD", "NATS_CLAUDIA_PW") and not password: + password = v + + return { + "url": url or "nats://localhost:4222", + "user": user, + "password": password, + } + + +def get_nats_env(workspace: Path = None) -> dict: + """Return os.environ copy with NATS credentials set for nats CLI.""" + creds = get_nats_credentials(workspace) + env = os.environ.copy() + if creds["user"]: + env["NATS_USER"] = creds["user"] + if creds["password"]: + env["NATS_PASSWORD"] = creds["password"] + if creds["url"]: + env["NATS_URL"] = creds["url"] + return env + + +def load_json(path: Path) -> dict: + """Load JSON file, returning empty dict on failure.""" + try: + return json.loads(path.read_text()) + except (FileNotFoundError, json.JSONDecodeError): + return {} + + +def save_json(path: Path, data: dict): + """Atomically write JSON to file.""" + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(".tmp") + tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False)) + tmp.rename(path) + + +def load_facts(path: Path) -> list[dict]: + """Load facts from a JSONL file.""" + if not path.exists(): + return [] + facts = [] + for line in path.read_text().strip().split("\n"): + if not line.strip(): + continue + try: + fact = json.loads(line) + if "text" not in fact and "fact" in fact: + fact["text"] = fact["fact"] + facts.append(fact) + except json.JSONDecodeError: + continue + return facts + + +def setup_logging(name: str, workspace: Path = None) -> logging.Logger: + """Configure logging to workspace/logs/ and stderr.""" + ws = workspace or get_workspace_dir() + log_dir = ws / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + log_file = log_dir / f"{name}.log" + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler(), + ], + ) + return logging.getLogger(name) diff --git a/cortex/memory/narrative_generator.py b/cortex/memory/narrative_generator.py new file mode 100644 index 0000000..0db521f --- /dev/null +++ b/cortex/memory/narrative_generator.py @@ -0,0 +1,171 @@ +"""Narrative Generator — Creates a human-readable story from recent activity. + +Reads daily notes and thread/decision data, compresses into a narrative. +Optionally uses a local LLM (Ollama) for richer narratives with structured fallback. +""" + +import json +import os +import subprocess +import sys +from datetime import datetime, timezone, timedelta +from pathlib import Path + +from .common import get_workspace_dir, get_reboot_dir, get_agent_name, load_json + +# Configurable via env +OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "mistral:7b") +OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434") + + +def load_daily_notes(workspace: Path, hours: int = 24) -> str: + """Load relevant daily notes.""" + parts = [] + today = datetime.now().strftime("%Y-%m-%d") + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + + for date in [yesterday, today]: + note_file = workspace / "memory" / f"{date}.md" + if note_file.exists(): + parts.append(f"## {date}\n{note_file.read_text()[:4000]}") + + return "\n\n".join(parts) + + +def load_threads(workspace: Path) -> list[dict]: + """Load threads for context.""" + data = load_json(get_reboot_dir(workspace) / "threads.json") + return data.get("threads", []) + + +def load_decisions(workspace: Path) -> list[dict]: + """Load recent decisions.""" + data = load_json(get_reboot_dir(workspace) / "decisions.json") + today = datetime.now().strftime("%Y-%m-%d") + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + return [d for d in data.get("decisions", []) if d.get("date", "") >= yesterday] + + +def generate_structured(notes: str, threads: list[dict], decisions: list[dict]) -> str: + """Generate narrative without LLM — structured approach.""" + now = datetime.now() + parts = [f"*{now.strftime('%A, %d. %B %Y')} — Narrative*\n"] + + open_threads = [t for t in threads if t.get("status") == "open"] + closed_threads = [t for t in threads if t.get("status") == "closed" + and t.get("last_activity", "")[:10] >= (now - timedelta(days=1)).strftime("%Y-%m-%d")] + + if closed_threads: + parts.append("**Completed:**") + for t in closed_threads: + parts.append(f"- ✅ {t['title']}: {t.get('summary', '')[:100]}") + parts.append("") + + if open_threads: + parts.append("**Open:**") + for t in open_threads: + prio = t.get("priority", "?") + parts.append(f"- {'🔴' if prio == 'critical' else '🟡'} {t['title']}: {t.get('summary', '')[:150]}") + if t.get("waiting_for"): + parts.append(f" ⏳ {t['waiting_for']}") + parts.append("") + + if decisions: + parts.append("**Decisions:**") + for d in decisions: + parts.append(f"- {d['what']} — {d.get('why', '')[:80]}") + parts.append("") + + if notes: + parts.append("**Timeline:**") + for line in notes.split("\n"): + line = line.strip() + if line.startswith("## ") and not line.startswith("## 20"): + parts.append(f"- {line[3:]}") + elif line.startswith("### "): + parts.append(f" - {line[4:]}") + parts.append("") + + return "\n".join(parts) + + +def generate_llm(notes: str, threads: list[dict], decisions: list[dict], + model: str = None) -> str | None: + """Generate narrative using local Ollama LLM. Returns None on failure.""" + agent = get_agent_name() + model = model or OLLAMA_MODEL + + context = f"""You are a memory system for an AI agent named {agent}. Write a brief, dense narrative (max 500 words) of what happened in the last 24 hours. Be specific — names, decisions, outcomes. No fluff. + +Daily Notes: +{notes[:3000]} + +Open Threads: +{json.dumps([{"title": t["title"], "summary": t.get("summary", ""), "priority": t.get("priority")} for t in threads if t.get("status") == "open"], indent=2)[:1000]} + +Decisions: +{json.dumps(decisions[:5], indent=2)[:500]} + +Write the narrative now. Be concise.""" + + try: + result = subprocess.run( + ["ollama", "run", model, context], + capture_output=True, text=True, timeout=60 + ) + if result.returncode == 0 and len(result.stdout.strip()) > 50: + return result.stdout.strip()[:2000] + except Exception: + pass + + return None + + +def run(hours: int = 24, dry_run: bool = False, no_llm: bool = False, + model: str = None, workspace: Path = None) -> str: + """Run narrative generator. Returns the narrative text.""" + ws = workspace or get_workspace_dir() + reboot_dir = get_reboot_dir(ws) + narrative_file = reboot_dir / "narrative.md" + + print("📖 Narrative Generator — building story...") + + notes = load_daily_notes(ws, hours) + threads = load_threads(ws) + decisions = load_decisions(ws) + + narrative = None + if not no_llm: + narrative = generate_llm(notes, threads, decisions, model=model) + if narrative: + print(" Used LLM for narrative") + + if not narrative: + narrative = generate_structured(notes, threads, decisions) + print(" Used structured narrative (no LLM)") + + if dry_run: + print("\n" + narrative) + else: + narrative_file.write_text(narrative) + print(f" ✅ narrative.md written ({len(narrative)} chars)") + + return narrative + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Narrative Generator") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--hours", type=int, default=24) + parser.add_argument("--no-llm", action="store_true", help="Skip LLM, use structured only") + parser.add_argument("--model", type=str, default=None, help=f"Ollama model (default: {OLLAMA_MODEL})") + parser.add_argument("--workspace", type=str, help="Workspace directory") + args = parser.parse_args() + + ws = Path(args.workspace) if args.workspace else None + run(hours=args.hours, dry_run=args.dry_run, no_llm=args.no_llm, model=args.model, workspace=ws) + + +if __name__ == "__main__": + main() diff --git a/cortex/memory/pre_compaction.py b/cortex/memory/pre_compaction.py new file mode 100644 index 0000000..b4b4e4f --- /dev/null +++ b/cortex/memory/pre_compaction.py @@ -0,0 +1,110 @@ +"""Pre-Compaction Snapshot — Captures the "hot zone" before memory loss. + +Orchestrates: thread tracker → hot snapshot → narrative → boot assembler. +""" + +import json +import subprocess +from datetime import datetime, timezone +from pathlib import Path + +from .common import get_workspace_dir, get_reboot_dir, get_nats_env +from .thread_tracker import run as run_thread_tracker +from .narrative_generator import run as run_narrative +from .boot_assembler import run as run_boot_assembler + + +def fetch_recent_messages(count: int = 20, workspace: Path = None) -> list[str]: + """Fetch last N messages from NATS for snapshot.""" + ws = workspace or get_workspace_dir() + env = get_nats_env(ws) + messages = [] + + try: + result = subprocess.run( + ["nats", "stream", "get", "openclaw-events", "--last", str(count), "--raw"], + capture_output=True, text=True, timeout=15, env=env + ) + if result.returncode == 0 and result.stdout.strip(): + for line in result.stdout.strip().split("\n"): + try: + evt = json.loads(line) + content = evt.get("content", evt.get("message", evt.get("text", ""))) + sender = evt.get("sender", evt.get("agent", evt.get("role", "?"))) + if content and len(content.strip()) > 3: + short = content.strip()[:200] + if len(content.strip()) > 200: + short += "..." + messages.append(f"[{sender}] {short}") + except json.JSONDecodeError: + continue + except Exception as e: + messages.append(f"(NATS fetch failed: {e})") + + return messages + + +def build_snapshot(messages: list[str]) -> str: + """Build the hot snapshot markdown.""" + now = datetime.now(timezone.utc) + parts = [ + f"# Hot Snapshot — {now.isoformat()[:19]}Z", + "## Last ~30min before compaction", + "", + ] + if messages: + parts.append("**Recent conversation:**") + for msg in messages[-15:]: + parts.append(f"- {msg}") + else: + parts.append("(No recent messages captured)") + parts.append("") + return "\n".join(parts) + + +def run(dry_run: bool = False, workspace: Path = None, **assembler_kwargs): + """Run the full pre-compaction pipeline.""" + ws = workspace or get_workspace_dir() + reboot_dir = get_reboot_dir(ws) + hot_snapshot_file = reboot_dir / "hot-snapshot.md" + + print("🔥 Pre-Compaction Snapshot — capturing hot zone...") + + # 1. Thread tracker (last 1h) + print(" 1/4 Thread tracker (last 1h)...") + run_thread_tracker(hours=1, workspace=ws) + + # 2. Hot snapshot + print(" 2/4 Capturing recent messages...") + messages = fetch_recent_messages(20, ws) + snapshot = build_snapshot(messages) + if dry_run: + print(snapshot) + else: + hot_snapshot_file.write_text(snapshot) + print(f" ✅ hot-snapshot.md written ({len(snapshot)} chars)") + + # 3. Narrative (no LLM for speed during compaction) + print(" 3/4 Narrative generator...") + run_narrative(no_llm=True, workspace=ws) + + # 4. Boot assembler + print(" 4/4 Boot assembler...") + run_boot_assembler(workspace=ws, **assembler_kwargs) + + print("🔥 Pre-Compaction Snapshot — done!") + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Pre-Compaction Snapshot") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--workspace", type=str, help="Workspace directory") + args = parser.parse_args() + + ws = Path(args.workspace) if args.workspace else None + run(dry_run=args.dry_run, workspace=ws) + + +if __name__ == "__main__": + main() diff --git a/cortex/memory/thread_tracker.py b/cortex/memory/thread_tracker.py new file mode 100644 index 0000000..c2a6ba6 --- /dev/null +++ b/cortex/memory/thread_tracker.py @@ -0,0 +1,267 @@ +"""Thread Tracker — Tracks open conversation threads from NATS events. + +Reads recent NATS events and extracts/updates conversation threads. +Thread detection heuristics: topic shifts, decision markers, status markers, waiting markers. +""" + +import json +import re +import subprocess +import sys +from datetime import datetime, timezone, timedelta +from pathlib import Path + +from .common import get_workspace_dir, get_reboot_dir, get_nats_env, load_json, save_json + +# Patterns for thread detection +DECISION_PATTERNS = [ + r"(?:decided|entschieden|decision|beschlossen|agreed|let'?s do|machen wir|wir machen)", + r"(?:the plan is|der plan ist|approach:|ansatz:)", +] +CLOSE_PATTERNS = [ + r"(?:done|erledigt|fixed|gefixt|gelöst|solved|closed|fertig|works|funktioniert|✅)", +] +WAIT_PATTERNS = [ + r"(?:waiting for|warte auf|blocked by|blockiert durch|need.*first|brauche.*erst)", +] +TOPIC_PATTERNS = [ + r"(?:back to|zurück zu|jetzt zu|now about|regarding|bzgl\.?|wegen)\s+(\w[\w\s-]{2,30})", +] + +MOOD_PATTERNS = { + "frustrated": r"(?:fuck|shit|mist|nervig|genervt|damn|wtf|argh|schon wieder|zum kotzen|sucks)", + "excited": r"(?:geil|nice|awesome|krass|boom|läuft|yes!|🎯|🚀|perfekt|brilliant|mega|sick)", + "tense": r"(?:vorsicht|careful|risky|heikel|kritisch|dringend|urgent|achtung|gefährlich)", + "productive": r"(?:erledigt|done|fixed|works|fertig|deployed|✅|gebaut|shipped|läuft)", + "exploratory": r"(?:was wäre wenn|what if|könnte man|idea|idee|maybe|vielleicht|experiment)", +} + + +def detect_mood(text: str) -> str: + """Detect overall mood from text. Last match wins.""" + if not text: + return "neutral" + text_lower = text.lower() + last_mood = "neutral" + last_pos = -1 + for mood, pattern in MOOD_PATTERNS.items(): + for m in re.finditer(pattern, text_lower): + if m.start() > last_pos: + last_pos = m.start() + last_mood = mood + return last_mood + + +def fetch_recent_events(hours: int = 4, workspace: Path = None) -> list[dict]: + """Fetch recent user/agent messages from NATS, with daily note fallback.""" + ws = workspace or get_workspace_dir() + env = get_nats_env(ws) + events = [] + + try: + result = subprocess.run( + ["nats", "stream", "get", "openclaw-events", "--last", "200", "--raw"], + capture_output=True, text=True, timeout=15, env=env + ) + if result.returncode == 0 and result.stdout.strip(): + for line in result.stdout.strip().split("\n"): + try: + events.append(json.loads(line)) + except json.JSONDecodeError: + continue + except Exception: + pass + + # Fallback: read from daily notes + if not events: + today = datetime.now().strftime("%Y-%m-%d") + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") + for date in [today, yesterday]: + note_file = ws / "memory" / f"{date}.md" + if note_file.exists(): + events.append({ + "type": "daily_note", + "date": date, + "content": note_file.read_text()[:5000] + }) + + return events + + +def extract_thread_signals(text: str) -> dict: + """Extract thread-related signals from text.""" + text_lower = text.lower() + signals = {"decisions": [], "closures": [], "waits": [], "topics": []} + + for pattern in DECISION_PATTERNS: + for m in re.finditer(pattern, text_lower): + start = max(0, m.start() - 50) + end = min(len(text), m.end() + 100) + signals["decisions"].append(text[start:end].strip()) + + for pattern in CLOSE_PATTERNS: + if re.search(pattern, text_lower): + signals["closures"].append(True) + + for pattern in WAIT_PATTERNS: + for m in re.finditer(pattern, text_lower): + start = m.start() + end = min(len(text), m.end() + 80) + signals["waits"].append(text[start:end].strip()) + + for pattern in TOPIC_PATTERNS: + for m in re.finditer(pattern, text_lower): + if m.group(1): + signals["topics"].append(m.group(1).strip()) + + return signals + + +def process_events(events: list[dict], existing_threads: list[dict]) -> list[dict]: + """Process events and update thread list.""" + threads = existing_threads.copy() + + for evt in events: + content = "" + if isinstance(evt, dict): + content = evt.get("content", evt.get("message", evt.get("text", ""))) + if not content and evt.get("type") == "daily_note": + content = evt.get("content", "") + if not content: + continue + + signals = extract_thread_signals(content) + + if signals["closures"]: + for t in threads: + if t["status"] == "open": + thread_words = set(t.get("title", "").lower().split()) + content_words = set(content.lower().split()) + if len(thread_words & content_words) >= 2: + t["status"] = "closed" + t["last_activity"] = datetime.now(timezone.utc).isoformat() + + for decision_ctx in signals["decisions"]: + for t in threads: + if t["status"] == "open": + thread_words = set(t.get("title", "").lower().split()) + decision_words = set(decision_ctx.lower().split()) + if len(thread_words & decision_words) >= 2: + if decision_ctx not in t.get("decisions", []): + t.setdefault("decisions", []).append(decision_ctx[:100]) + + return threads + + +def run(hours: int = 4, dry_run: bool = False, workspace: Path = None) -> dict: + """Run thread tracker. Returns the threads data dict.""" + ws = workspace or get_workspace_dir() + reboot_dir = get_reboot_dir(ws) + threads_file = reboot_dir / "threads.json" + + print(f"🧵 Thread Tracker — scanning last {hours}h of events...") + + data = load_json(threads_file) + existing_threads = data.get("threads", []) + + events = fetch_recent_events(hours, ws) + print(f" Found {len(events)} events") + + updated_threads = process_events(events, existing_threads) + + # Prune closed threads older than 7 days + cutoff = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() + pruned = [t for t in updated_threads + if not (t.get("status") == "closed" and t.get("last_activity", "") < cutoff)] + + data["threads"] = pruned + data["updated"] = datetime.now(timezone.utc).isoformat() + data["version"] = 2 + + # Integrity tracking + last_nats_ts = None + last_nats_seq = None + for evt in reversed(events): + if isinstance(evt, dict): + if evt.get("seq"): + last_nats_seq = evt["seq"] + ts = evt.get("timestamp", evt.get("time", evt.get("date"))) + if ts: + last_nats_ts = ts + break + data["integrity"] = { + "last_nats_seq": last_nats_seq, + "last_nats_timestamp": last_nats_ts or datetime.now(timezone.utc).isoformat(), + "events_processed": len(events), + "source": "streaming" if events and not any(e.get("type") == "daily_note" for e in events) else "daily_notes", + } + + all_content = " ".join( + evt.get("content", evt.get("message", evt.get("text", ""))) + for evt in events if isinstance(evt, dict) + ) + data["session_mood"] = detect_mood(all_content) + + if dry_run: + print(json.dumps(data, indent=2, ensure_ascii=False)) + else: + save_json(threads_file, data) + open_count = sum(1 for t in pruned if t.get("status") == "open") + print(f" ✅ {len(pruned)} threads ({open_count} open), written to {threads_file}") + + return data + + +def show_threads(summary: bool = False, workspace: Path = None): + """Display current thread state.""" + ws = workspace or get_workspace_dir() + threads_file = get_reboot_dir(ws) / "threads.json" + data = load_json(threads_file) + threads = data.get("threads", []) + + if not threads: + print("No threads tracked.") + return + + open_threads = [t for t in threads if t.get("status") == "open"] + closed_threads = [t for t in threads if t.get("status") == "closed"] + + if summary: + print(f"Threads: {len(open_threads)} open, {len(closed_threads)} closed") + print(f"Session mood: {data.get('session_mood', 'neutral')}") + print(f"Last updated: {data.get('updated', '?')}") + return + + prio_emoji = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🔵"} + for t in open_threads: + emoji = prio_emoji.get(t.get("priority"), "⚪") + print(f"{emoji} {t['title']} [{t.get('priority', '?')}]") + print(f" {t.get('summary', '')[:120]}") + if t.get("waiting_for"): + print(f" ⏳ {t['waiting_for']}") + print() + + if closed_threads: + print(f"--- {len(closed_threads)} closed threads ---") + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Thread Tracker") + parser.add_argument("--hours", type=int, default=4, help="Look back N hours") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--summary", action="store_true", help="Show summary only") + parser.add_argument("--show", action="store_true", help="Show threads without updating") + parser.add_argument("--workspace", type=str, help="Workspace directory") + args = parser.parse_args() + + ws = Path(args.workspace) if args.workspace else None + + if args.show or args.summary: + show_threads(summary=args.summary, workspace=ws) + else: + run(hours=args.hours, dry_run=args.dry_run, workspace=ws) + + +if __name__ == "__main__": + main()