#!/usr/bin/env python3 """Cortex Context — Generate LEARNING_CONTEXT.md from NATS events. Ported from ~/clawd/scripts/context-generator.mjs Usage: cortex context [--events 2000] [--output context.md] cortex context --jsonl events.jsonl """ import argparse import json import os import subprocess import sys from datetime import datetime from pathlib import Path from typing import Optional from cortex.config import cortex_home # --- Configuration --- def learning_dir() -> Path: """Directory for learning output files.""" return cortex_home() / "learning" # Topic keywords for classification TOPIC_KEYWORDS = { 'coding': ['code', 'script', 'function', 'bug', 'error', 'python', 'javascript', 'node', 'npm'], 'mondo-gate': ['mondo', 'mygate', 'fintech', 'payment', 'crypto', 'bitcoin', 'token', 'wallet'], 'infrastructure': ['server', 'docker', 'systemd', 'service', 'nats', 'api', 'deploy', 'port'], 'agents': ['agent', 'mona', 'vera', 'stella', 'viola', 'sub-agent', 'spawn', 'session'], 'security': ['security', 'audit', 'vulnerability', 'password', 'credential', 'auth', 'vault'], 'memory': ['memory', 'context', 'event', 'learning', 'brain', 'recall', 'remember'], 'communication': ['matrix', 'telegram', 'email', 'message', 'chat', 'voice'], 'business': ['meeting', 'pitch', 'investor', 'series', 'valuation', 'partner', 'strategy'], 'files': ['file', 'read', 'write', 'edit', 'create', 'delete', 'directory'], } # --- NATS CLI Helpers --- def get_nats_bin() -> str: """Find nats binary.""" # Try common locations for path in [ os.path.expanduser('~/bin/nats'), '/usr/local/bin/nats', '/usr/bin/nats', ]: if os.path.exists(path): return path # Fall back to PATH return 'nats' def get_stream_info() -> Optional[dict]: """Get NATS stream info.""" nats_bin = get_nats_bin() try: result = subprocess.run( [nats_bin, 'stream', 'info', 'openclaw-events', '--json'], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: info = json.loads(result.stdout) return { 'messages': info.get('state', {}).get('messages', 0), 'first_seq': info.get('state', {}).get('first_seq', 1), 'last_seq': info.get('state', {}).get('last_seq', 0), } except Exception as e: print(f"❌ Error getting stream info: {e}", file=sys.stderr) return None def fetch_nats_events(start_seq: int, end_seq: int) -> list[dict]: """Fetch events from NATS by sequence range.""" nats_bin = get_nats_bin() events = [] for seq in range(start_seq, end_seq + 1): try: result = subprocess.run( [nats_bin, 'stream', 'get', 'openclaw-events', str(seq), '--json'], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: msg = json.loads(result.stdout) # Decode base64 data import base64 data_b64 = msg.get('data', '') data = json.loads(base64.b64decode(data_b64).decode('utf-8')) events.append({ 'seq': msg.get('seq'), 'subject': msg.get('subject'), 'time': msg.get('time'), **data }) except Exception: continue # Progress indicator if len(events) % 200 == 0 and len(events) > 0: print(f"\r Fetched {len(events)} events...", end='', file=sys.stderr) print(file=sys.stderr) return events def load_jsonl_events(source: str) -> list[dict]: """Load events from JSONL file or stdin.""" events = [] if source == '-': lines = sys.stdin else: path = Path(source) if not path.exists(): print(f"❌ File not found: {source}", file=sys.stderr) return [] lines = path.open() for line in lines: line = line.strip() if not line: continue try: events.append(json.loads(line)) except json.JSONDecodeError: continue if source != '-': lines.close() return events # --- Analysis --- def extract_text(event: dict) -> str: """Extract text content from various event formats.""" # Try text_preview if event.get('payload', {}).get('text_preview'): previews = event['payload']['text_preview'] if isinstance(previews, list): return ' '.join(p.get('text', '') for p in previews if isinstance(p, dict) and p.get('type') == 'text') # Try payload.data.text if event.get('payload', {}).get('data', {}).get('text'): return event['payload']['data']['text'] # Try payload.message if event.get('payload', {}).get('message'): return event['payload']['message'] # Try data.text if event.get('data', {}).get('text'): return event['data']['text'] return '' def analyze_events(events: list[dict]) -> dict: """Extract patterns from events.""" patterns = { 'languages': {'de': 0, 'en': 0}, 'topics': {}, 'tools': {}, 'time_of_day': {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0}, 'message_types': {}, 'agents': {}, 'sessions': set(), 'user_messages': [], 'assistant_messages': [], } for event in events: text = extract_text(event) text_lower = text.lower() # Message type tracking msg_type = event.get('type', 'unknown') patterns['message_types'][msg_type] = patterns['message_types'].get(msg_type, 0) + 1 # Agent tracking if event.get('agent'): agent = event['agent'] patterns['agents'][agent] = patterns['agents'].get(agent, 0) + 1 # Session tracking if event.get('session'): patterns['sessions'].add(event['session']) # Track user vs assistant messages if 'message_in' in msg_type or 'message.in' in msg_type: if len(text) > 5: patterns['user_messages'].append(text[:200]) if 'message_out' in msg_type or 'message.out' in msg_type: if len(text) > 20: snippet = text[:200] if snippet not in patterns['assistant_messages']: patterns['assistant_messages'].append(snippet) # Tool usage from tool events if 'tool' in msg_type and event.get('payload', {}).get('data', {}).get('name'): tool_name = event['payload']['data']['name'] patterns['tools'][tool_name] = patterns['tools'].get(tool_name, 0) + 1 # Skip if no text for language/topic analysis if len(text) < 5: continue # Language detection (simple heuristic) german_pattern = r'[äöüß]|(?:^|\s)(ich|du|wir|das|ist|und|oder|nicht|ein|eine|für|auf|mit|von)(?:\s|$)' english_pattern = r'(?:^|\s)(the|is|are|this|that|you|we|have|has|will|would|can|could)(?:\s|$)' import re if re.search(german_pattern, text, re.IGNORECASE): patterns['languages']['de'] += 1 elif re.search(english_pattern, text, re.IGNORECASE): patterns['languages']['en'] += 1 # Topic detection for topic, keywords in TOPIC_KEYWORDS.items(): for kw in keywords: if kw in text_lower: patterns['topics'][topic] = patterns['topics'].get(topic, 0) + 1 break # Time of day (Europe/Berlin) ts = event.get('timestamp') or event.get('timestampMs') if ts: try: dt = datetime.fromtimestamp(ts / 1000 if ts > 1e12 else ts) hour = dt.hour if 6 <= hour < 12: patterns['time_of_day']['morning'] += 1 elif 12 <= hour < 18: patterns['time_of_day']['afternoon'] += 1 elif 18 <= hour < 23: patterns['time_of_day']['evening'] += 1 else: patterns['time_of_day']['night'] += 1 except Exception: pass return patterns def generate_context_md(patterns: dict, event_count: int, stream_info: Optional[dict]) -> str: """Generate learning context markdown.""" total_lang = patterns['languages']['de'] + patterns['languages']['en'] primary_lang = 'Deutsch' if patterns['languages']['de'] > patterns['languages']['en'] else 'English' if total_lang > 0: de_pct = round(patterns['languages']['de'] / total_lang * 100) en_pct = round(patterns['languages']['en'] / total_lang * 100) lang_ratio = f"{de_pct}% DE / {en_pct}% EN" else: lang_ratio = 'mixed' # Sort topics by frequency top_topics = sorted(patterns['topics'].items(), key=lambda x: -x[1])[:8] # Sort tools by usage top_tools = sorted(patterns['tools'].items(), key=lambda x: -x[1])[:8] # Determine activity pattern max_time = max(patterns['time_of_day'].items(), key=lambda x: x[1]) if patterns['time_of_day'] else ('varied', 0) # Recent user messages (last 5 unique) recent_user = list(set(patterns['user_messages'][-20:]))[-5:] recent_user = [m.replace('\n', ' ')[:80] for m in recent_user] stream_total = stream_info['messages'] if stream_info else None stream_str = f"{stream_total:,}" if isinstance(stream_total, int) else '?' md = f"""# Learning Context for Claudia (main agent) *Auto-generated from {event_count} events* *Stream total: {stream_str} events* *Updated: {datetime.now().isoformat()}* ## User Preferences | Preference | Value | |------------|-------| | **Primary Language** | {primary_lang} ({lang_ratio}) | | **Peak Activity** | {max_time[0].capitalize()} | | **Active Sessions** | {len(patterns['sessions'])} unique | ## Current Focus Areas (by frequency) {chr(10).join(f"- **{t[0]}** — {t[1]} mentions" for t in top_topics) if top_topics else '- (analyzing patterns...)'} ## Most Used Tools {chr(10).join(f"- `{t[0]}` — {t[1]}x" for t in top_tools) if top_tools else '- (no tool usage tracked)'} ## Activity Distribution | Time | Events | |------|--------| | Morning (6-12) | {patterns['time_of_day']['morning']} | | Afternoon (12-18) | {patterns['time_of_day']['afternoon']} | | Evening (18-23) | {patterns['time_of_day']['evening']} | | Night (23-6) | {patterns['time_of_day']['night']} | ## Agent Distribution {chr(10).join(f"- **{a}**: {c} events" for a, c in sorted(patterns['agents'].items(), key=lambda x: -x[1])) if patterns['agents'] else '- main only'} ## Recent User Requests {chr(10).join(f'> "{m}..."' for m in recent_user) if recent_user else '> (no recent messages captured)'} --- ## Insights for Response Generation Based on the patterns above: - **Language**: Respond primarily in {primary_lang}, mixing when appropriate - **Focus**: Current work centers on {', '.join(t[0] for t in top_topics[:3]) or 'general tasks'} - **Tools**: Frequently use {', '.join(t[0] for t in top_tools[:3]) or 'various tools'} - **Timing**: Most active during {max_time[0]} *This context helps maintain continuity and personalization across sessions.* """ return md # --- Main --- def run_context_generation(events: list[dict], output_path: Optional[Path] = None, agent: str = 'main', stream_info: Optional[dict] = None) -> dict: """Run context generation on events.""" print(f"📊 Context Generator", file=sys.stderr) print(f" Analyzing {len(events)} events...", file=sys.stderr) patterns = analyze_events(events) print(f" Topics: {len(patterns['topics'])}", file=sys.stderr) print(f" Tools: {len(patterns['tools'])}", file=sys.stderr) print(f" User messages: {len(patterns['user_messages'])}", file=sys.stderr) context_md = generate_context_md(patterns, len(events), stream_info) # Determine output path if output_path: out = output_path else: ldir = learning_dir() / agent ldir.mkdir(parents=True, exist_ok=True) out = ldir / 'learning-context.md' out.parent.mkdir(parents=True, exist_ok=True) out.write_text(context_md) print(f"✅ Written to {out}", file=sys.stderr) # Save patterns for debugging patterns_out = out.parent / 'patterns.json' patterns_serializable = { **patterns, 'sessions': list(patterns['sessions']), 'user_messages': patterns['user_messages'][-20:], 'assistant_messages': patterns['assistant_messages'][-10:], } patterns_out.write_text(json.dumps(patterns_serializable, indent=2, ensure_ascii=False)) print(f" Patterns saved to {patterns_out}", file=sys.stderr) return patterns_serializable def main(): parser = argparse.ArgumentParser( description='Generate LEARNING_CONTEXT.md from NATS events', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Examples: cortex context --events 2000 cortex context --output ~/custom-context.md cortex context --jsonl events.jsonl ''' ) parser.add_argument('--events', type=int, default=1000, help='Number of recent events to analyze (default: 1000)') parser.add_argument('--output', '-o', type=Path, help='Output file path (default: CORTEX_HOME/learning//learning-context.md)') parser.add_argument('--agent', default='main', help='Agent name for output directory') parser.add_argument('--jsonl', metavar='FILE', help='Load events from JSONL file (use - for stdin)') parser.add_argument('--json', action='store_true', help='Output patterns as JSON') args = parser.parse_args() stream_info = None if args.jsonl: events = load_jsonl_events(args.jsonl) else: # Fetch from NATS stream_info = get_stream_info() if not stream_info: print("❌ Could not get stream info", file=sys.stderr) sys.exit(1) print(f" Stream has {stream_info['messages']:,} total events", file=sys.stderr) start_seq = max(stream_info['first_seq'], stream_info['last_seq'] - args.events + 1) end_seq = stream_info['last_seq'] print(f" Fetching seq {start_seq} - {end_seq}...", file=sys.stderr) events = fetch_nats_events(start_seq, end_seq) if not events: print("⚠️ No events found", file=sys.stderr) sys.exit(0) patterns = run_context_generation(events, args.output, args.agent, stream_info) if args.json: print(json.dumps(patterns, indent=2, ensure_ascii=False)) if __name__ == '__main__': main()