darkplex-core/cortex/summarize.py
Claudia fda607c204
All checks were successful
Tests / test (push) Successful in 4s
fix: sync missing import os + stray } from darkplex-core PR #2 (YesMan)
2026-02-11 20:25:29 +01:00

209 lines
6.8 KiB
Python

#!/usr/bin/env python3
"""Daily Conversation Summarizer — reads events from NATS, summarizes via LLM.
Usage:
cortex summarize [--date YYYY-MM-DD] [--model gemma2:27b] [--dry-run]
"""
import argparse
import base64
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
from cortex.config import cortex_home
NATS_BIN = str(Path.home() / "bin" / "nats")
OLLAMA_URL = os.environ.get("DARKPLEX_LLM_URL", "http://localhost:11434/api/generate")
STREAM = "openclaw-events"
SUMMARY_PROMPT = """You are a personal assistant summarizing a day's conversations.
Write a concise daily summary in German. Focus on:
1. **Entscheidungen** — Was wurde beschlossen?
2. **Fortschritte** — Was wurde gebaut/erledigt?
3. **Probleme** — Was ging schief, was ist offen?
4. **Personen** — Wer war beteiligt, neue Kontakte?
5. **Nächste Schritte** — Was steht an?
Keep it under 500 words. Use bullet points. Skip heartbeats and system noise.
CONVERSATIONS FROM {date}:
{conversations}
DAILY SUMMARY (auf Deutsch):"""
def warm_dir():
return cortex_home() / "brain" / "warm"
def get_day_events(date_str: str) -> list:
day = datetime.strptime(date_str, "%Y-%m-%d")
day_start = day.timestamp()
day_end = (day + timedelta(days=1)).timestamp()
r = subprocess.run([NATS_BIN, "stream", "info", STREAM, "-j"],
capture_output=True, text=True)
if r.returncode != 0:
return []
info = json.loads(r.stdout)
total = info["state"]["messages"]
first_seq = info["state"]["first_seq"]
last_seq = info["state"]["last_seq"]
first_ts = datetime.fromisoformat(
info["state"]["first_ts"].replace("Z", "+00:00")).timestamp()
last_ts = datetime.fromisoformat(
info["state"]["last_ts"].replace("Z", "+00:00")).timestamp()
days_active = max((last_ts - first_ts) / 86400, 1)
events_per_day = total / days_active
days_from_start = (day_start - first_ts) / 86400
est_start = max(first_seq, int(first_seq + days_from_start * events_per_day - events_per_day * 0.1))
est_end = min(last_seq, int(est_start + events_per_day * 1.2))
events = []
in_range = False
for seq in range(est_start, est_end + 1):
r = subprocess.run([NATS_BIN, "stream", "get", STREAM, str(seq), "-j"],
capture_output=True, text=True, timeout=3)
if r.returncode != 0:
continue
try:
data = json.loads(r.stdout)
if "conversation_message" not in data.get("subject", ""):
continue
raw = data.get("data", "")
if not raw:
continue
decoded = json.loads(base64.b64decode(raw).decode("utf-8"))
ts = decoded.get("timestamp", 0)
if isinstance(ts, (int, float)) and ts > 1e12:
ts = ts / 1000
if ts < day_start:
continue
if ts > day_end:
if in_range:
break
continue
in_range = True
payload = decoded.get("payload", {})
text = ""
tp = payload.get("text_preview", "")
if isinstance(tp, str):
text = tp
elif isinstance(tp, list):
text = " ".join(i.get("text", "") for i in tp if isinstance(i, dict))
if not text:
pd = payload.get("data", {})
if isinstance(pd, dict):
text = pd.get("text", "") or pd.get("content", "")
if not text:
text = payload.get("text", "") or payload.get("content", "")
text = text.strip()
if not text or len(text) < 20:
continue
if "HEARTBEAT" in text or text.startswith("[cron:"):
continue
hour = datetime.fromtimestamp(ts).strftime("%H:%M")
agent = decoded.get("agent", "?")
etype = decoded.get("type", "")
direction = "" if "message_out" in etype else ""
events.append({"time": hour, "agent": agent, "direction": direction, "text": text[:300]})
except Exception:
continue
return events
def summarize_with_llm(events: list, date_str: str, model: str = "gemma2:27b") -> str:
conv_text = ""
for ev in events:
conv_text += f"[{ev['time']}] {ev['direction']} ({ev['agent']}) {ev['text'][:200]}\n"
if len(conv_text) > 4000:
conv_text += f"\n... and {len(events)} more messages"
break
prompt = SUMMARY_PROMPT.format(date=date_str, conversations=conv_text)
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.3, "num_predict": 1500},
}
try:
r = subprocess.run(
["curl", "-s", "-m", "90", OLLAMA_URL, "-d", json.dumps(payload)],
capture_output=True, text=True, timeout=95,
)
return json.loads(r.stdout).get("response", "").strip()
except Exception as e:
return f"⚠️ Summary generation failed: {e}"
def main():
parser = argparse.ArgumentParser(description="Daily Conversation Summarizer")
parser.add_argument("--date", type=str, help="Date to summarize (YYYY-MM-DD), default: yesterday")
parser.add_argument("--model", type=str, default="gemma2:27b")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
date_str = args.date or (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
print(f"🌡️ Daily Summarizer — {date_str}")
print("=" * 50)
print(f"📥 Fetching events for {date_str}...")
t0 = time.time()
events = get_day_events(date_str)
print(f" Found {len(events)} conversation events in {time.time() - t0:.1f}s")
if not events:
print("❌ No events found for this date.")
return
if args.dry_run:
print("\n📝 Events:")
for ev in events[:30]:
print(f" [{ev['time']}] {ev['direction']} ({ev['agent']}) {ev['text'][:100]}")
if len(events) > 30:
print(f" ... +{len(events) - 30} more")
return
print(f"\n🤖 Generating summary with {args.model}...")
t0 = time.time()
summary = summarize_with_llm(events, date_str, args.model)
print(f" Done in {time.time() - t0:.1f}s")
wd = warm_dir()
wd.mkdir(parents=True, exist_ok=True)
output_file = wd / f"{date_str}.md"
content = f"# Daily Summary: {date_str}\n"
content += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
content += f"Events: {len(events)} conversations\nModel: {args.model}\n\n"
content += summary
output_file.write_text(content)
print(f"\n📝 Written to {output_file}")
print(f"\n{summary[:500]}...")
if __name__ == "__main__":
main()