#!/usr/bin/env python3 """Cortex Learn — Extract preferences and behavior patterns from NATS events. Ported from ~/clawd/scripts/learning-processor.mjs Usage: cortex learn --since 24h [--nats nats://localhost:4222] cortex learn --jsonl input.jsonl cat events.jsonl | cortex learn --jsonl - """ import argparse import hashlib import json import os import re import sys from datetime import datetime, timedelta from pathlib import Path from typing import Any, Optional from cortex.config import cortex_home # Try to import nats (optional dependency) try: import nats from nats.js.api import StreamSource HAS_NATS = True except ImportError: HAS_NATS = False # --- Directories --- def learning_dir() -> Path: """Directory for learning output files.""" return cortex_home() / "learning" # --- Constants --- # German + English stopwords to filter from topics STOPWORDS = { # German 'nicht', 'dass', 'wenn', 'dann', 'aber', 'oder', 'und', 'der', 'die', 'das', 'ein', 'eine', 'einer', 'ist', 'sind', 'war', 'waren', 'wird', 'werden', 'hat', 'haben', 'kann', 'können', 'muss', 'müssen', 'soll', 'sollte', 'wird', 'würde', 'hier', 'dort', 'jetzt', 'noch', 'schon', 'auch', 'nur', 'sehr', 'mehr', 'viel', 'ganz', 'alle', 'allem', 'allen', 'aller', 'alles', 'heute', 'morgen', 'gestern', 'immer', 'wieder', 'also', 'dabei', 'damit', 'danach', 'darin', 'darum', 'davon', 'dazu', 'denn', 'doch', 'durch', # English 'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'had', 'her', 'was', 'one', 'our', 'out', 'has', 'have', 'been', 'would', 'could', 'this', 'that', 'with', 'from', 'what', 'which', 'their', 'will', 'there', } # Signal patterns for preference extraction POSITIVE_PATTERNS = [ r'\bmega\b', r'\bsuper\b', r'\bperfekt\b', r'\bgenau\s*so\b', r'\bja\s*!', r'\bcool\b', r'gut\s*gemacht', r'das\s*war\s*gut', r'gefällt\s*mir', r'\bklasse\b', r'\btoll\b', r'👍', r'🎉', r'💪', r'❤️', r'😘', r'🚀', r'✅', r'💯', ] NEGATIVE_PATTERNS = [ r'\bnein,\s*ich', r'nicht\s*so,', r'\bfalsch\b', r'\bstopp\b', r'das\s*stimmt\s*nicht', r'ich\s*meinte\s*eigentlich', r'❌', r'👎', ] CORRECTION_PATTERNS = [ r'nein,?\s*ich\s*meinte', r'nicht\s*so,?\s*sondern', r'das\s*ist\s*falsch', r'eigentlich\s*wollte\s*ich', ] # Compile patterns for efficiency POSITIVE_RE = [re.compile(p, re.IGNORECASE) for p in POSITIVE_PATTERNS] NEGATIVE_RE = [re.compile(p, re.IGNORECASE) for p in NEGATIVE_PATTERNS] CORRECTION_RE = [re.compile(p, re.IGNORECASE) for p in CORRECTION_PATTERNS] # --- Helper Functions --- def parse_duration(duration_str: str) -> timedelta: """Parse duration string like '24h', '7d', '30m' to timedelta.""" match = re.match(r'^(\d+)([hdm])$', duration_str.lower()) if not match: raise ValueError(f"Invalid duration format: {duration_str}. Use like '24h', '7d', '30m'") value = int(match.group(1)) unit = match.group(2) if unit == 'h': return timedelta(hours=value) elif unit == 'd': return timedelta(days=value) elif unit == 'm': return timedelta(minutes=value) else: raise ValueError(f"Unknown unit: {unit}") def load_json(path: Path, default: Any = None) -> Any: """Load JSON file, returning default if not found.""" if path.exists(): return json.loads(path.read_text()) return default if default is not None else {} def save_json(path: Path, data: Any) -> None: """Save data as JSON file.""" path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(data, indent=2, ensure_ascii=False)) def extract_text(event: dict) -> str: """Extract text content from various event formats.""" # Try different paths where text might be if isinstance(event.get('payload'), dict): payload = event['payload'] if 'data' in payload and isinstance(payload['data'], dict): if 'text' in payload['data']: return payload['data']['text'] if 'content' in payload: return payload['content'] if 'text_preview' in payload: previews = payload['text_preview'] if isinstance(previews, list): return ' '.join(p.get('text', '') for p in previews if isinstance(p, dict)) if 'data' in event and isinstance(event['data'], dict): if 'text' in event['data']: return event['data']['text'] return '' # --- Signal Extraction --- def extract_signals(events: list[dict]) -> dict: """Extract preference signals from events.""" signals = { 'positive': [], 'negative': [], 'corrections': [], 'topics': {}, # topic -> count 'skill_gaps': {}, # domain -> count 'task_outcomes': [], } for event in events: text = extract_text(event) if not text: continue timestamp = event.get('timestamp', event.get('timestampMs', 0)) # Check for positive signals for pattern in POSITIVE_RE: if pattern.search(text): signals['positive'].append({ 'timestamp': timestamp, 'text': text[:200], 'pattern': pattern.pattern, }) break # Check for negative signals for pattern in NEGATIVE_RE: if pattern.search(text): signals['negative'].append({ 'timestamp': timestamp, 'text': text[:200], 'pattern': pattern.pattern, }) break # Check for corrections for pattern in CORRECTION_RE: if pattern.search(text): signals['corrections'].append({ 'timestamp': timestamp, 'text': text[:200], }) break # Extract topics (capitalized words/phrases) topic_matches = re.findall(r'[A-Z][a-zäöü]+(?:\s+[A-Z][a-zäöü]+)*', text) for topic in topic_matches: if len(topic) > 3 and topic.lower() not in STOPWORDS: signals['topics'][topic] = signals['topics'].get(topic, 0) + 1 # Detect skill gaps (errors, retries) if re.search(r'error|fehler|failed|nicht gefunden|module not found', text, re.IGNORECASE): domains = re.findall(r'TypeScript|NATS|Matrix|Python|Node|npm|git', text, re.IGNORECASE) for domain in (domains or ['unknown']): d = domain.lower() signals['skill_gaps'][d] = signals['skill_gaps'].get(d, 0) + 1 # Track task outcomes (lifecycle events) event_type = event.get('type', '') if 'lifecycle' in event_type: phase = event.get('payload', {}).get('data', {}).get('phase', '') if phase in ('end', 'error'): signals['task_outcomes'].append({ 'timestamp': timestamp, 'success': phase == 'end', 'session': event.get('session', ''), }) return signals def derive_preferences(signals: dict, existing: dict) -> dict: """Derive preferences from extracted signals.""" prefs = {**existing} positive_count = len(signals['positive']) negative_count = len(signals['negative']) correction_count = len(signals['corrections']) prefs['lastAnalysis'] = datetime.now().isoformat() prefs['signalCounts'] = { 'positive': positive_count, 'negative': negative_count, 'corrections': correction_count, } # Derive insights prefs['derived'] = prefs.get('derived', {}) if correction_count > 5: prefs['derived']['needsMoreClarification'] = True if positive_count > negative_count * 2: prefs['derived']['approachWorking'] = True # Top topics top_topics = sorted(signals['topics'].items(), key=lambda x: -x[1])[:10] prefs['topTopics'] = [{'topic': t, 'count': c} for t, c in top_topics] # Explicit preferences (defaults) prefs['explicit'] = { 'responseLength': 'concise', 'technicalDepth': 'high', 'humor': 'appreciated', 'tablesInTelegram': 'never', 'proactiveSuggestions': 'always', 'fleiß': 'mandatory', 'ehrgeiz': 'mandatory', 'biss': 'mandatory', } return prefs def derive_behaviors(signals: dict, existing: dict) -> dict: """Derive behavior patterns from signals.""" behaviors = {**existing} behaviors['lastAnalysis'] = datetime.now().isoformat() behaviors['successPatterns'] = behaviors.get('successPatterns', []) behaviors['failurePatterns'] = behaviors.get('failurePatterns', []) # Recent positive signals indicate successful patterns for pos in signals['positive'][-10:]: behaviors['successPatterns'].append({ 'timestamp': pos['timestamp'], 'context': pos['text'][:100], }) # Keep only last 50 patterns behaviors['successPatterns'] = behaviors['successPatterns'][-50:] behaviors['failurePatterns'] = behaviors['failurePatterns'][-50:] return behaviors def generate_learning_context(prefs: dict, behaviors: dict, topics: dict, skill_gaps: dict) -> str: """Generate LEARNING_CONTEXT.md content.""" lines = [ '# Learned Context', '', f'*Auto-generated by Learning Processor at {datetime.now().isoformat()}*', '', '## Core Values (Fleiß, Ehrgeiz, Biss)', '', '- **Proaktiv handeln** — Nicht warten, sondern vorangehen', '- **Bei Fehlern:** 3 Alternativen probieren, dann nochmal 3, dann erst fragen', '- **Enttäuschung als Treibstoff** — Doppelter Fleiß bis es funktioniert', '', '## Learned Preferences', '', ] if prefs.get('explicit'): for key, value in prefs['explicit'].items(): lines.append(f'- **{key}:** {value}') lines.extend(['', '## Active Topics', '']) real_topics = [t for t in prefs.get('topTopics', []) if t['topic'].lower() not in STOPWORDS][:8] for t in real_topics: lines.append(f"- {t['topic']} ({t['count']} mentions)") if skill_gaps.get('priority'): lines.extend(['', '## Areas to Improve', '']) for gap in skill_gaps['priority']: lines.append(f"- **{gap['area']}:** {gap['frequency']} recent errors — study this!") lines.extend(['', '## Signal Summary', '']) lines.append(f"- Positive signals: {prefs.get('signalCounts', {}).get('positive', 0)}") lines.append(f"- Corrections needed: {prefs.get('signalCounts', {}).get('corrections', 0)}") return '\n'.join(lines) # --- NATS Event Fetching --- async def fetch_nats_events(nats_url: str, hours: int = 24) -> list[dict]: """Fetch recent events from NATS JetStream.""" if not HAS_NATS: print("❌ nats-py not installed. Use: pip install nats-py", file=sys.stderr) return [] events = [] cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000) # Parse credentials from URL or environment user = os.environ.get('NATS_USER', 'claudia') password = os.environ.get('NATS_PASSWORD', '') try: nc = await nats.connect(nats_url, user=user, password=password) js = nc.jetstream() # Get stream info try: stream = await js.stream_info('openclaw-events') last_seq = stream.state.last_seq start_seq = max(1, last_seq - 500) # Last 500 events # Fetch messages by sequence for seq in range(start_seq, last_seq + 1): try: msg = await js.get_msg('openclaw-events', seq) event = json.loads(msg.data.decode()) if event.get('timestamp', 0) >= cutoff: events.append(event) except Exception: continue except Exception as e: print(f"❌ Error reading stream: {e}", file=sys.stderr) await nc.drain() except Exception as e: print(f"❌ NATS connection failed: {e}", file=sys.stderr) return events def load_jsonl_events(source: str, hours: int = 24) -> list[dict]: """Load events from JSONL file or stdin.""" events = [] cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000) if source == '-': lines = sys.stdin else: path = Path(source) if not path.exists(): print(f"❌ File not found: {source}", file=sys.stderr) return [] lines = path.open() for line in lines: line = line.strip() if not line: continue try: event = json.loads(line) # Filter by time if timestamp present ts = event.get('timestamp', event.get('timestampMs', 0)) if ts == 0 or ts >= cutoff: events.append(event) except json.JSONDecodeError: continue if source != '-': lines.close() return events # --- Main --- def run_learning_cycle(events: list[dict], agent: str = 'main') -> dict: """Run the learning cycle on provided events.""" ldir = learning_dir() / agent ldir.mkdir(parents=True, exist_ok=True) # File paths prefs_file = ldir / 'preferences.json' behaviors_file = ldir / 'behaviors.json' topics_file = ldir / 'topics.json' skill_gaps_file = ldir / 'skill-gaps.json' context_file = ldir / 'learning-context.md' print(f'🧠 Learning Processor', file=sys.stderr) print(f' Events: {len(events)}', file=sys.stderr) # Extract signals signals = extract_signals(events) print(f' Positive: {len(signals["positive"])}', file=sys.stderr) print(f' Negative: {len(signals["negative"])}', file=sys.stderr) print(f' Corrections: {len(signals["corrections"])}', file=sys.stderr) print(f' Topics: {len(signals["topics"])}', file=sys.stderr) # Load existing data existing_prefs = load_json(prefs_file, {}) existing_behaviors = load_json(behaviors_file, {}) # Derive new preferences and behaviors new_prefs = derive_preferences(signals, existing_prefs) new_behaviors = derive_behaviors(signals, existing_behaviors) # Save all files save_json(prefs_file, new_prefs) save_json(behaviors_file, new_behaviors) topics_data = { 'lastAnalysis': datetime.now().isoformat(), 'topics': signals['topics'], } save_json(topics_file, topics_data) skill_gaps_data = { 'lastAnalysis': datetime.now().isoformat(), 'gaps': signals['skill_gaps'], 'priority': sorted( [{'area': k, 'frequency': v} for k, v in signals['skill_gaps'].items()], key=lambda x: -x['frequency'] )[:5], } save_json(skill_gaps_file, skill_gaps_data) # Generate context markdown context_md = generate_learning_context(new_prefs, new_behaviors, topics_data, skill_gaps_data) context_file.write_text(context_md) print(f'\n✅ Learning cycle complete!', file=sys.stderr) print(f' Output: {ldir}', file=sys.stderr) # Print summary if new_prefs.get('topTopics'): print('\n📈 Top Topics:', file=sys.stderr) for t in new_prefs['topTopics'][:5]: print(f" - {t['topic']}: {t['count']}", file=sys.stderr) if skill_gaps_data['priority']: print('\n⚠️ Skill Gaps:', file=sys.stderr) for gap in skill_gaps_data['priority']: print(f" - {gap['area']}: {gap['frequency']} errors", file=sys.stderr) return { 'preferences': new_prefs, 'behaviors': new_behaviors, 'topics': topics_data, 'skill_gaps': skill_gaps_data, } def main(): parser = argparse.ArgumentParser( description='Extract preferences and patterns from NATS events', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Examples: cortex learn --since 24h cortex learn --since 7d --nats nats://localhost:4222 cortex learn --jsonl events.jsonl cat events.jsonl | cortex learn --jsonl - ''' ) parser.add_argument('--since', default='24h', help='Time window (e.g. 24h, 7d, 30m)') parser.add_argument('--nats', default='nats://localhost:4222', help='NATS server URL') parser.add_argument('--jsonl', metavar='FILE', help='Load events from JSONL file (use - for stdin)') parser.add_argument('--agent', default='main', help='Agent name for output directory') parser.add_argument('--json', action='store_true', help='Output results as JSON') args = parser.parse_args() # Parse duration try: duration = parse_duration(args.since) hours = duration.total_seconds() / 3600 except ValueError as e: print(f"❌ {e}", file=sys.stderr) sys.exit(1) # Fetch events if args.jsonl: events = load_jsonl_events(args.jsonl, int(hours)) else: import asyncio events = asyncio.run(fetch_nats_events(args.nats, int(hours))) if not events: print("⚠️ No events found", file=sys.stderr) sys.exit(0) # Run learning cycle results = run_learning_cycle(events, args.agent) if args.json: print(json.dumps(results, indent=2, ensure_ascii=False)) if __name__ == '__main__': main()