433 lines
15 KiB
Python
433 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""Cortex Context — Generate LEARNING_CONTEXT.md from NATS events.
|
|
|
|
Ported from ~/clawd/scripts/context-generator.mjs
|
|
|
|
Usage:
|
|
cortex context [--events 2000] [--output context.md]
|
|
cortex context --jsonl events.jsonl
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from cortex.config import cortex_home
|
|
|
|
|
|
# --- Configuration ---
|
|
|
|
def learning_dir() -> Path:
|
|
"""Directory for learning output files."""
|
|
return cortex_home() / "learning"
|
|
|
|
|
|
# Topic keywords for classification
|
|
TOPIC_KEYWORDS = {
|
|
'coding': ['code', 'script', 'function', 'bug', 'error', 'python', 'javascript', 'node', 'npm'],
|
|
'mondo-gate': ['mondo', 'mygate', 'fintech', 'payment', 'crypto', 'bitcoin', 'token', 'wallet'],
|
|
'infrastructure': ['server', 'docker', 'systemd', 'service', 'nats', 'api', 'deploy', 'port'],
|
|
'agents': ['agent', 'mona', 'vera', 'stella', 'viola', 'sub-agent', 'spawn', 'session'],
|
|
'security': ['security', 'audit', 'vulnerability', 'password', 'credential', 'auth', 'vault'],
|
|
'memory': ['memory', 'context', 'event', 'learning', 'brain', 'recall', 'remember'],
|
|
'communication': ['matrix', 'telegram', 'email', 'message', 'chat', 'voice'],
|
|
'business': ['meeting', 'pitch', 'investor', 'series', 'valuation', 'partner', 'strategy'],
|
|
'files': ['file', 'read', 'write', 'edit', 'create', 'delete', 'directory'],
|
|
}
|
|
|
|
|
|
# --- NATS CLI Helpers ---
|
|
|
|
def get_nats_bin() -> str:
|
|
"""Find nats binary."""
|
|
# Try common locations
|
|
for path in [
|
|
os.path.expanduser('~/bin/nats'),
|
|
'/usr/local/bin/nats',
|
|
'/usr/bin/nats',
|
|
]:
|
|
if os.path.exists(path):
|
|
return path
|
|
# Fall back to PATH
|
|
return 'nats'
|
|
|
|
|
|
def get_stream_info() -> Optional[dict]:
|
|
"""Get NATS stream info."""
|
|
nats_bin = get_nats_bin()
|
|
try:
|
|
result = subprocess.run(
|
|
[nats_bin, 'stream', 'info', 'openclaw-events', '--json'],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
info = json.loads(result.stdout)
|
|
return {
|
|
'messages': info.get('state', {}).get('messages', 0),
|
|
'first_seq': info.get('state', {}).get('first_seq', 1),
|
|
'last_seq': info.get('state', {}).get('last_seq', 0),
|
|
}
|
|
except Exception as e:
|
|
print(f"❌ Error getting stream info: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def fetch_nats_events(start_seq: int, end_seq: int) -> list[dict]:
|
|
"""Fetch events from NATS by sequence range."""
|
|
nats_bin = get_nats_bin()
|
|
events = []
|
|
|
|
for seq in range(start_seq, end_seq + 1):
|
|
try:
|
|
result = subprocess.run(
|
|
[nats_bin, 'stream', 'get', 'openclaw-events', str(seq), '--json'],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
msg = json.loads(result.stdout)
|
|
# Decode base64 data
|
|
import base64
|
|
data_b64 = msg.get('data', '')
|
|
data = json.loads(base64.b64decode(data_b64).decode('utf-8'))
|
|
events.append({
|
|
'seq': msg.get('seq'),
|
|
'subject': msg.get('subject'),
|
|
'time': msg.get('time'),
|
|
**data
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
# Progress indicator
|
|
if len(events) % 200 == 0 and len(events) > 0:
|
|
print(f"\r Fetched {len(events)} events...", end='', file=sys.stderr)
|
|
|
|
print(file=sys.stderr)
|
|
return events
|
|
|
|
|
|
def load_jsonl_events(source: str) -> list[dict]:
|
|
"""Load events from JSONL file or stdin."""
|
|
events = []
|
|
|
|
if source == '-':
|
|
lines = sys.stdin
|
|
else:
|
|
path = Path(source)
|
|
if not path.exists():
|
|
print(f"❌ File not found: {source}", file=sys.stderr)
|
|
return []
|
|
lines = path.open()
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
events.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if source != '-':
|
|
lines.close()
|
|
|
|
return events
|
|
|
|
|
|
# --- Analysis ---
|
|
|
|
def extract_text(event: dict) -> str:
|
|
"""Extract text content from various event formats."""
|
|
# Try text_preview
|
|
if event.get('payload', {}).get('text_preview'):
|
|
previews = event['payload']['text_preview']
|
|
if isinstance(previews, list):
|
|
return ' '.join(p.get('text', '') for p in previews if isinstance(p, dict) and p.get('type') == 'text')
|
|
|
|
# Try payload.data.text
|
|
if event.get('payload', {}).get('data', {}).get('text'):
|
|
return event['payload']['data']['text']
|
|
|
|
# Try payload.message
|
|
if event.get('payload', {}).get('message'):
|
|
return event['payload']['message']
|
|
|
|
# Try data.text
|
|
if event.get('data', {}).get('text'):
|
|
return event['data']['text']
|
|
|
|
return ''
|
|
|
|
|
|
def analyze_events(events: list[dict]) -> dict:
|
|
"""Extract patterns from events."""
|
|
patterns = {
|
|
'languages': {'de': 0, 'en': 0},
|
|
'topics': {},
|
|
'tools': {},
|
|
'time_of_day': {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0},
|
|
'message_types': {},
|
|
'agents': {},
|
|
'sessions': set(),
|
|
'user_messages': [],
|
|
'assistant_messages': [],
|
|
}
|
|
|
|
for event in events:
|
|
text = extract_text(event)
|
|
text_lower = text.lower()
|
|
|
|
# Message type tracking
|
|
msg_type = event.get('type', 'unknown')
|
|
patterns['message_types'][msg_type] = patterns['message_types'].get(msg_type, 0) + 1
|
|
|
|
# Agent tracking
|
|
if event.get('agent'):
|
|
agent = event['agent']
|
|
patterns['agents'][agent] = patterns['agents'].get(agent, 0) + 1
|
|
|
|
# Session tracking
|
|
if event.get('session'):
|
|
patterns['sessions'].add(event['session'])
|
|
|
|
# Track user vs assistant messages
|
|
if 'message_in' in msg_type or 'message.in' in msg_type:
|
|
if len(text) > 5:
|
|
patterns['user_messages'].append(text[:200])
|
|
if 'message_out' in msg_type or 'message.out' in msg_type:
|
|
if len(text) > 20:
|
|
snippet = text[:200]
|
|
if snippet not in patterns['assistant_messages']:
|
|
patterns['assistant_messages'].append(snippet)
|
|
|
|
# Tool usage from tool events
|
|
if 'tool' in msg_type and event.get('payload', {}).get('data', {}).get('name'):
|
|
tool_name = event['payload']['data']['name']
|
|
patterns['tools'][tool_name] = patterns['tools'].get(tool_name, 0) + 1
|
|
|
|
# Skip if no text for language/topic analysis
|
|
if len(text) < 5:
|
|
continue
|
|
|
|
# Language detection (simple heuristic)
|
|
german_pattern = r'[äöüß]|(?:^|\s)(ich|du|wir|das|ist|und|oder|nicht|ein|eine|für|auf|mit|von)(?:\s|$)'
|
|
english_pattern = r'(?:^|\s)(the|is|are|this|that|you|we|have|has|will|would|can|could)(?:\s|$)'
|
|
|
|
import re
|
|
if re.search(german_pattern, text, re.IGNORECASE):
|
|
patterns['languages']['de'] += 1
|
|
elif re.search(english_pattern, text, re.IGNORECASE):
|
|
patterns['languages']['en'] += 1
|
|
|
|
# Topic detection
|
|
for topic, keywords in TOPIC_KEYWORDS.items():
|
|
for kw in keywords:
|
|
if kw in text_lower:
|
|
patterns['topics'][topic] = patterns['topics'].get(topic, 0) + 1
|
|
break
|
|
|
|
# Time of day (Europe/Berlin)
|
|
ts = event.get('timestamp') or event.get('timestampMs')
|
|
if ts:
|
|
try:
|
|
dt = datetime.fromtimestamp(ts / 1000 if ts > 1e12 else ts)
|
|
hour = dt.hour
|
|
if 6 <= hour < 12:
|
|
patterns['time_of_day']['morning'] += 1
|
|
elif 12 <= hour < 18:
|
|
patterns['time_of_day']['afternoon'] += 1
|
|
elif 18 <= hour < 23:
|
|
patterns['time_of_day']['evening'] += 1
|
|
else:
|
|
patterns['time_of_day']['night'] += 1
|
|
except Exception:
|
|
pass
|
|
|
|
return patterns
|
|
|
|
|
|
def generate_context_md(patterns: dict, event_count: int, stream_info: Optional[dict]) -> str:
|
|
"""Generate learning context markdown."""
|
|
total_lang = patterns['languages']['de'] + patterns['languages']['en']
|
|
primary_lang = 'Deutsch' if patterns['languages']['de'] > patterns['languages']['en'] else 'English'
|
|
|
|
if total_lang > 0:
|
|
de_pct = round(patterns['languages']['de'] / total_lang * 100)
|
|
en_pct = round(patterns['languages']['en'] / total_lang * 100)
|
|
lang_ratio = f"{de_pct}% DE / {en_pct}% EN"
|
|
else:
|
|
lang_ratio = 'mixed'
|
|
|
|
# Sort topics by frequency
|
|
top_topics = sorted(patterns['topics'].items(), key=lambda x: -x[1])[:8]
|
|
|
|
# Sort tools by usage
|
|
top_tools = sorted(patterns['tools'].items(), key=lambda x: -x[1])[:8]
|
|
|
|
# Determine activity pattern
|
|
max_time = max(patterns['time_of_day'].items(), key=lambda x: x[1]) if patterns['time_of_day'] else ('varied', 0)
|
|
|
|
# Recent user messages (last 5 unique)
|
|
recent_user = list(set(patterns['user_messages'][-20:]))[-5:]
|
|
recent_user = [m.replace('\n', ' ')[:80] for m in recent_user]
|
|
|
|
stream_total = stream_info['messages'] if stream_info else None
|
|
stream_str = f"{stream_total:,}" if isinstance(stream_total, int) else '?'
|
|
|
|
md = f"""# Learning Context for Claudia (main agent)
|
|
|
|
*Auto-generated from {event_count} events*
|
|
*Stream total: {stream_str} events*
|
|
*Updated: {datetime.now().isoformat()}*
|
|
|
|
## User Preferences
|
|
|
|
| Preference | Value |
|
|
|------------|-------|
|
|
| **Primary Language** | {primary_lang} ({lang_ratio}) |
|
|
| **Peak Activity** | {max_time[0].capitalize()} |
|
|
| **Active Sessions** | {len(patterns['sessions'])} unique |
|
|
|
|
## Current Focus Areas (by frequency)
|
|
|
|
{chr(10).join(f"- **{t[0]}** — {t[1]} mentions" for t in top_topics) if top_topics else '- (analyzing patterns...)'}
|
|
|
|
## Most Used Tools
|
|
|
|
{chr(10).join(f"- `{t[0]}` — {t[1]}x" for t in top_tools) if top_tools else '- (no tool usage tracked)'}
|
|
|
|
## Activity Distribution
|
|
|
|
| Time | Events |
|
|
|------|--------|
|
|
| Morning (6-12) | {patterns['time_of_day']['morning']} |
|
|
| Afternoon (12-18) | {patterns['time_of_day']['afternoon']} |
|
|
| Evening (18-23) | {patterns['time_of_day']['evening']} |
|
|
| Night (23-6) | {patterns['time_of_day']['night']} |
|
|
|
|
## Agent Distribution
|
|
|
|
{chr(10).join(f"- **{a}**: {c} events" for a, c in sorted(patterns['agents'].items(), key=lambda x: -x[1])) if patterns['agents'] else '- main only'}
|
|
|
|
## Recent User Requests
|
|
|
|
{chr(10).join(f'> "{m}..."' for m in recent_user) if recent_user else '> (no recent messages captured)'}
|
|
|
|
---
|
|
|
|
## Insights for Response Generation
|
|
|
|
Based on the patterns above:
|
|
- **Language**: Respond primarily in {primary_lang}, mixing when appropriate
|
|
- **Focus**: Current work centers on {', '.join(t[0] for t in top_topics[:3]) or 'general tasks'}
|
|
- **Tools**: Frequently use {', '.join(t[0] for t in top_tools[:3]) or 'various tools'}
|
|
- **Timing**: Most active during {max_time[0]}
|
|
|
|
*This context helps maintain continuity and personalization across sessions.*
|
|
"""
|
|
return md
|
|
|
|
|
|
# --- Main ---
|
|
|
|
def run_context_generation(events: list[dict], output_path: Optional[Path] = None,
|
|
agent: str = 'main', stream_info: Optional[dict] = None) -> dict:
|
|
"""Run context generation on events."""
|
|
print(f"📊 Context Generator", file=sys.stderr)
|
|
print(f" Analyzing {len(events)} events...", file=sys.stderr)
|
|
|
|
patterns = analyze_events(events)
|
|
|
|
print(f" Topics: {len(patterns['topics'])}", file=sys.stderr)
|
|
print(f" Tools: {len(patterns['tools'])}", file=sys.stderr)
|
|
print(f" User messages: {len(patterns['user_messages'])}", file=sys.stderr)
|
|
|
|
context_md = generate_context_md(patterns, len(events), stream_info)
|
|
|
|
# Determine output path
|
|
if output_path:
|
|
out = output_path
|
|
else:
|
|
ldir = learning_dir() / agent
|
|
ldir.mkdir(parents=True, exist_ok=True)
|
|
out = ldir / 'learning-context.md'
|
|
|
|
out.parent.mkdir(parents=True, exist_ok=True)
|
|
out.write_text(context_md)
|
|
|
|
print(f"✅ Written to {out}", file=sys.stderr)
|
|
|
|
# Save patterns for debugging
|
|
patterns_out = out.parent / 'patterns.json'
|
|
patterns_serializable = {
|
|
**patterns,
|
|
'sessions': list(patterns['sessions']),
|
|
'user_messages': patterns['user_messages'][-20:],
|
|
'assistant_messages': patterns['assistant_messages'][-10:],
|
|
}
|
|
patterns_out.write_text(json.dumps(patterns_serializable, indent=2, ensure_ascii=False))
|
|
print(f" Patterns saved to {patterns_out}", file=sys.stderr)
|
|
|
|
return patterns_serializable
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Generate LEARNING_CONTEXT.md from NATS events',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog='''
|
|
Examples:
|
|
cortex context --events 2000
|
|
cortex context --output ~/custom-context.md
|
|
cortex context --jsonl events.jsonl
|
|
'''
|
|
)
|
|
parser.add_argument('--events', type=int, default=1000,
|
|
help='Number of recent events to analyze (default: 1000)')
|
|
parser.add_argument('--output', '-o', type=Path,
|
|
help='Output file path (default: CORTEX_HOME/learning/<agent>/learning-context.md)')
|
|
parser.add_argument('--agent', default='main',
|
|
help='Agent name for output directory')
|
|
parser.add_argument('--jsonl', metavar='FILE',
|
|
help='Load events from JSONL file (use - for stdin)')
|
|
parser.add_argument('--json', action='store_true',
|
|
help='Output patterns as JSON')
|
|
|
|
args = parser.parse_args()
|
|
|
|
stream_info = None
|
|
|
|
if args.jsonl:
|
|
events = load_jsonl_events(args.jsonl)
|
|
else:
|
|
# Fetch from NATS
|
|
stream_info = get_stream_info()
|
|
if not stream_info:
|
|
print("❌ Could not get stream info", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print(f" Stream has {stream_info['messages']:,} total events", file=sys.stderr)
|
|
|
|
start_seq = max(stream_info['first_seq'], stream_info['last_seq'] - args.events + 1)
|
|
end_seq = stream_info['last_seq']
|
|
|
|
print(f" Fetching seq {start_seq} - {end_seq}...", file=sys.stderr)
|
|
events = fetch_nats_events(start_seq, end_seq)
|
|
|
|
if not events:
|
|
print("⚠️ No events found", file=sys.stderr)
|
|
sys.exit(0)
|
|
|
|
patterns = run_context_generation(events, args.output, args.agent, stream_info)
|
|
|
|
if args.json:
|
|
print(json.dumps(patterns, indent=2, ensure_ascii=False))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|