"""Thread Tracker — Tracks open conversation threads from NATS events. Reads recent NATS events and extracts/updates conversation threads. Thread detection heuristics: topic shifts, decision markers, status markers, waiting markers. """ import json import re import subprocess import sys from datetime import datetime, timezone, timedelta from pathlib import Path from .common import get_workspace_dir, get_reboot_dir, get_nats_env, load_json, save_json # Patterns for thread detection DECISION_PATTERNS = [ r"(?:decided|entschieden|decision|beschlossen|agreed|let'?s do|machen wir|wir machen)", r"(?:the plan is|der plan ist|approach:|ansatz:)", ] CLOSE_PATTERNS = [ r"(?:done|erledigt|fixed|gefixt|gelöst|solved|closed|fertig|works|funktioniert|✅)", ] WAIT_PATTERNS = [ r"(?:waiting for|warte auf|blocked by|blockiert durch|need.*first|brauche.*erst)", ] TOPIC_PATTERNS = [ r"(?:back to|zurück zu|jetzt zu|now about|regarding|bzgl\.?|wegen)\s+(\w[\w\s-]{2,30})", ] MOOD_PATTERNS = { "frustrated": r"(?:fuck|shit|mist|nervig|genervt|damn|wtf|argh|schon wieder|zum kotzen|sucks)", "excited": r"(?:geil|nice|awesome|krass|boom|läuft|yes!|🎯|🚀|perfekt|brilliant|mega|sick)", "tense": r"(?:vorsicht|careful|risky|heikel|kritisch|dringend|urgent|achtung|gefährlich)", "productive": r"(?:erledigt|done|fixed|works|fertig|deployed|✅|gebaut|shipped|läuft)", "exploratory": r"(?:was wäre wenn|what if|könnte man|idea|idee|maybe|vielleicht|experiment)", } def detect_mood(text: str) -> str: """Detect overall mood from text. Last match wins.""" if not text: return "neutral" text_lower = text.lower() last_mood = "neutral" last_pos = -1 for mood, pattern in MOOD_PATTERNS.items(): for m in re.finditer(pattern, text_lower): if m.start() > last_pos: last_pos = m.start() last_mood = mood return last_mood def fetch_recent_events(hours: int = 4, workspace: Path = None) -> list[dict]: """Fetch recent user/agent messages from NATS, with daily note fallback.""" ws = workspace or get_workspace_dir() env = get_nats_env(ws) events = [] try: result = subprocess.run( ["nats", "stream", "get", "openclaw-events", "--last", "200", "--raw"], capture_output=True, text=True, timeout=15, env=env ) if result.returncode == 0 and result.stdout.strip(): for line in result.stdout.strip().split("\n"): try: events.append(json.loads(line)) except json.JSONDecodeError: continue except Exception: pass # Fallback: read from daily notes if not events: today = datetime.now().strftime("%Y-%m-%d") yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") for date in [today, yesterday]: note_file = ws / "memory" / f"{date}.md" if note_file.exists(): events.append({ "type": "daily_note", "date": date, "content": note_file.read_text()[:5000] }) return events def extract_thread_signals(text: str) -> dict: """Extract thread-related signals from text.""" text_lower = text.lower() signals = {"decisions": [], "closures": [], "waits": [], "topics": []} for pattern in DECISION_PATTERNS: for m in re.finditer(pattern, text_lower): start = max(0, m.start() - 50) end = min(len(text), m.end() + 100) signals["decisions"].append(text[start:end].strip()) for pattern in CLOSE_PATTERNS: if re.search(pattern, text_lower): signals["closures"].append(True) for pattern in WAIT_PATTERNS: for m in re.finditer(pattern, text_lower): start = m.start() end = min(len(text), m.end() + 80) signals["waits"].append(text[start:end].strip()) for pattern in TOPIC_PATTERNS: for m in re.finditer(pattern, text_lower): if m.group(1): signals["topics"].append(m.group(1).strip()) return signals def process_events(events: list[dict], existing_threads: list[dict]) -> list[dict]: """Process events and update thread list.""" threads = existing_threads.copy() for evt in events: content = "" if isinstance(evt, dict): content = evt.get("content", evt.get("message", evt.get("text", ""))) if not content and evt.get("type") == "daily_note": content = evt.get("content", "") if not content: continue signals = extract_thread_signals(content) if signals["closures"]: for t in threads: if t["status"] == "open": thread_words = set(t.get("title", "").lower().split()) content_words = set(content.lower().split()) if len(thread_words & content_words) >= 2: t["status"] = "closed" t["last_activity"] = datetime.now(timezone.utc).isoformat() for decision_ctx in signals["decisions"]: for t in threads: if t["status"] == "open": thread_words = set(t.get("title", "").lower().split()) decision_words = set(decision_ctx.lower().split()) if len(thread_words & decision_words) >= 2: if decision_ctx not in t.get("decisions", []): t.setdefault("decisions", []).append(decision_ctx[:100]) return threads def run(hours: int = 4, dry_run: bool = False, workspace: Path = None) -> dict: """Run thread tracker. Returns the threads data dict.""" ws = workspace or get_workspace_dir() reboot_dir = get_reboot_dir(ws) threads_file = reboot_dir / "threads.json" print(f"🧵 Thread Tracker — scanning last {hours}h of events...") data = load_json(threads_file) existing_threads = data.get("threads", []) events = fetch_recent_events(hours, ws) print(f" Found {len(events)} events") updated_threads = process_events(events, existing_threads) # Prune closed threads older than 7 days cutoff = (datetime.now(timezone.utc) - timedelta(days=7)).isoformat() pruned = [t for t in updated_threads if not (t.get("status") == "closed" and t.get("last_activity", "") < cutoff)] data["threads"] = pruned data["updated"] = datetime.now(timezone.utc).isoformat() data["version"] = 2 # Integrity tracking last_nats_ts = None last_nats_seq = None for evt in reversed(events): if isinstance(evt, dict): if evt.get("seq"): last_nats_seq = evt["seq"] ts = evt.get("timestamp", evt.get("time", evt.get("date"))) if ts: last_nats_ts = ts break data["integrity"] = { "last_nats_seq": last_nats_seq, "last_nats_timestamp": last_nats_ts or datetime.now(timezone.utc).isoformat(), "events_processed": len(events), "source": "streaming" if events and not any(e.get("type") == "daily_note" for e in events) else "daily_notes", } all_content = " ".join( evt.get("content", evt.get("message", evt.get("text", ""))) for evt in events if isinstance(evt, dict) ) data["session_mood"] = detect_mood(all_content) if dry_run: print(json.dumps(data, indent=2, ensure_ascii=False)) else: save_json(threads_file, data) open_count = sum(1 for t in pruned if t.get("status") == "open") print(f" ✅ {len(pruned)} threads ({open_count} open), written to {threads_file}") return data def show_threads(summary: bool = False, workspace: Path = None): """Display current thread state.""" ws = workspace or get_workspace_dir() threads_file = get_reboot_dir(ws) / "threads.json" data = load_json(threads_file) threads = data.get("threads", []) if not threads: print("No threads tracked.") return open_threads = [t for t in threads if t.get("status") == "open"] closed_threads = [t for t in threads if t.get("status") == "closed"] if summary: print(f"Threads: {len(open_threads)} open, {len(closed_threads)} closed") print(f"Session mood: {data.get('session_mood', 'neutral')}") print(f"Last updated: {data.get('updated', '?')}") return prio_emoji = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🔵"} for t in open_threads: emoji = prio_emoji.get(t.get("priority"), "⚪") print(f"{emoji} {t['title']} [{t.get('priority', '?')}]") print(f" {t.get('summary', '')[:120]}") if t.get("waiting_for"): print(f" ⏳ {t['waiting_for']}") print() if closed_threads: print(f"--- {len(closed_threads)} closed threads ---") def main(): import argparse parser = argparse.ArgumentParser(description="Thread Tracker") parser.add_argument("--hours", type=int, default=4, help="Look back N hours") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--summary", action="store_true", help="Show summary only") parser.add_argument("--show", action="store_true", help="Show threads without updating") parser.add_argument("--workspace", type=str, help="Workspace directory") args = parser.parse_args() ws = Path(args.workspace) if args.workspace else None if args.show or args.summary: show_threads(summary=args.summary, workspace=ws) else: run(hours=args.hours, dry_run=args.dry_run, workspace=ws) if __name__ == "__main__": main()