#!/usr/bin/env python3 """Daily Conversation Summarizer — reads events from NATS, summarizes via LLM. Usage: cortex summarize [--date YYYY-MM-DD] [--model gemma2:27b] [--dry-run] """ import argparse import base64 import json import os import subprocess import sys import time from datetime import datetime, timedelta from pathlib import Path from cortex.config import cortex_home NATS_BIN = str(Path.home() / "bin" / "nats") OLLAMA_URL = os.environ.get("DARKPLEX_LLM_URL", "http://localhost:11434/api/generate") STREAM = "openclaw-events" SUMMARY_PROMPT = """You are a personal assistant summarizing a day's conversations. Write a concise daily summary in German. Focus on: 1. **Entscheidungen** — Was wurde beschlossen? 2. **Fortschritte** — Was wurde gebaut/erledigt? 3. **Probleme** — Was ging schief, was ist offen? 4. **Personen** — Wer war beteiligt, neue Kontakte? 5. **Nächste Schritte** — Was steht an? Keep it under 500 words. Use bullet points. Skip heartbeats and system noise. CONVERSATIONS FROM {date}: {conversations} DAILY SUMMARY (auf Deutsch):""" def warm_dir(): return cortex_home() / "brain" / "warm" def get_day_events(date_str: str) -> list: day = datetime.strptime(date_str, "%Y-%m-%d") day_start = day.timestamp() day_end = (day + timedelta(days=1)).timestamp() r = subprocess.run([NATS_BIN, "stream", "info", STREAM, "-j"], capture_output=True, text=True) if r.returncode != 0: return [] info = json.loads(r.stdout) total = info["state"]["messages"] first_seq = info["state"]["first_seq"] last_seq = info["state"]["last_seq"] first_ts = datetime.fromisoformat( info["state"]["first_ts"].replace("Z", "+00:00")).timestamp() last_ts = datetime.fromisoformat( info["state"]["last_ts"].replace("Z", "+00:00")).timestamp() days_active = max((last_ts - first_ts) / 86400, 1) events_per_day = total / days_active days_from_start = (day_start - first_ts) / 86400 est_start = max(first_seq, int(first_seq + days_from_start * events_per_day - events_per_day * 0.1)) est_end = min(last_seq, int(est_start + events_per_day * 1.2)) events = [] in_range = False for seq in range(est_start, est_end + 1): r = subprocess.run([NATS_BIN, "stream", "get", STREAM, str(seq), "-j"], capture_output=True, text=True, timeout=3) if r.returncode != 0: continue try: data = json.loads(r.stdout) if "conversation_message" not in data.get("subject", ""): continue raw = data.get("data", "") if not raw: continue decoded = json.loads(base64.b64decode(raw).decode("utf-8")) ts = decoded.get("timestamp", 0) if isinstance(ts, (int, float)) and ts > 1e12: ts = ts / 1000 if ts < day_start: continue if ts > day_end: if in_range: break continue in_range = True payload = decoded.get("payload", {}) text = "" tp = payload.get("text_preview", "") if isinstance(tp, str): text = tp elif isinstance(tp, list): text = " ".join(i.get("text", "") for i in tp if isinstance(i, dict)) if not text: pd = payload.get("data", {}) if isinstance(pd, dict): text = pd.get("text", "") or pd.get("content", "") if not text: text = payload.get("text", "") or payload.get("content", "") text = text.strip() if not text or len(text) < 20: continue if "HEARTBEAT" in text or text.startswith("[cron:"): continue hour = datetime.fromtimestamp(ts).strftime("%H:%M") agent = decoded.get("agent", "?") etype = decoded.get("type", "") direction = "→" if "message_out" in etype else "←" events.append({"time": hour, "agent": agent, "direction": direction, "text": text[:300]}) except Exception: continue return events def summarize_with_llm(events: list, date_str: str, model: str = "gemma2:27b") -> str: conv_text = "" for ev in events: conv_text += f"[{ev['time']}] {ev['direction']} ({ev['agent']}) {ev['text'][:200]}\n" if len(conv_text) > 4000: conv_text += f"\n... and {len(events)} more messages" break prompt = SUMMARY_PROMPT.format(date=date_str, conversations=conv_text) payload = { "model": model, "prompt": prompt, "stream": False, "options": {"temperature": 0.3, "num_predict": 1500}, } try: r = subprocess.run( ["curl", "-s", "-m", "90", OLLAMA_URL, "-d", json.dumps(payload)], capture_output=True, text=True, timeout=95, ) return json.loads(r.stdout).get("response", "").strip() except Exception as e: return f"⚠️ Summary generation failed: {e}" def main(): parser = argparse.ArgumentParser(description="Daily Conversation Summarizer") parser.add_argument("--date", type=str, help="Date to summarize (YYYY-MM-DD), default: yesterday") parser.add_argument("--model", type=str, default="gemma2:27b") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() date_str = args.date or (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") print(f"🌡️ Daily Summarizer — {date_str}") print("=" * 50) print(f"📥 Fetching events for {date_str}...") t0 = time.time() events = get_day_events(date_str) print(f" Found {len(events)} conversation events in {time.time() - t0:.1f}s") if not events: print("❌ No events found for this date.") return if args.dry_run: print("\n📝 Events:") for ev in events[:30]: print(f" [{ev['time']}] {ev['direction']} ({ev['agent']}) {ev['text'][:100]}") if len(events) > 30: print(f" ... +{len(events) - 30} more") return print(f"\n🤖 Generating summary with {args.model}...") t0 = time.time() summary = summarize_with_llm(events, date_str, args.model) print(f" Done in {time.time() - t0:.1f}s") wd = warm_dir() wd.mkdir(parents=True, exist_ok=True) output_file = wd / f"{date_str}.md" content = f"# Daily Summary: {date_str}\n" content += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n" content += f"Events: {len(events)} conversations\nModel: {args.model}\n\n" content += summary output_file.write_text(content) print(f"\n📝 Written to {output_file}") print(f"\n{summary[:500]}...") if __name__ == "__main__": main()