528 lines
18 KiB
Python
528 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""Cortex Learn — Extract preferences and behavior patterns from NATS events.
|
|
|
|
Ported from ~/clawd/scripts/learning-processor.mjs
|
|
|
|
Usage:
|
|
cortex learn --since 24h [--nats nats://localhost:4222]
|
|
cortex learn --jsonl input.jsonl
|
|
cat events.jsonl | cortex learn --jsonl -
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
from cortex.config import cortex_home
|
|
|
|
# Try to import nats (optional dependency)
|
|
try:
|
|
import nats
|
|
from nats.js.api import StreamSource
|
|
HAS_NATS = True
|
|
except ImportError:
|
|
HAS_NATS = False
|
|
|
|
|
|
# --- Directories ---
|
|
|
|
def learning_dir() -> Path:
|
|
"""Directory for learning output files."""
|
|
return cortex_home() / "learning"
|
|
|
|
|
|
# --- Constants ---
|
|
|
|
# German + English stopwords to filter from topics
|
|
STOPWORDS = {
|
|
# German
|
|
'nicht', 'dass', 'wenn', 'dann', 'aber', 'oder', 'und', 'der', 'die', 'das',
|
|
'ein', 'eine', 'einer', 'ist', 'sind', 'war', 'waren', 'wird', 'werden',
|
|
'hat', 'haben', 'kann', 'können', 'muss', 'müssen', 'soll', 'sollte',
|
|
'wird', 'würde', 'hier', 'dort', 'jetzt', 'noch', 'schon', 'auch', 'nur',
|
|
'sehr', 'mehr', 'viel', 'ganz', 'alle', 'allem', 'allen', 'aller', 'alles',
|
|
'heute', 'morgen', 'gestern', 'immer', 'wieder', 'also', 'dabei', 'damit',
|
|
'danach', 'darin', 'darum', 'davon', 'dazu', 'denn', 'doch', 'durch',
|
|
# English
|
|
'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'had',
|
|
'her', 'was', 'one', 'our', 'out', 'has', 'have', 'been', 'would', 'could',
|
|
'this', 'that', 'with', 'from', 'what', 'which', 'their', 'will', 'there',
|
|
}
|
|
|
|
# Signal patterns for preference extraction
|
|
POSITIVE_PATTERNS = [
|
|
r'\bmega\b', r'\bsuper\b', r'\bperfekt\b', r'\bgenau\s*so\b', r'\bja\s*!',
|
|
r'\bcool\b', r'gut\s*gemacht', r'das\s*war\s*gut', r'gefällt\s*mir',
|
|
r'\bklasse\b', r'\btoll\b', r'👍', r'🎉', r'💪', r'❤️', r'😘', r'🚀', r'✅', r'💯',
|
|
]
|
|
|
|
NEGATIVE_PATTERNS = [
|
|
r'\bnein,\s*ich', r'nicht\s*so,', r'\bfalsch\b', r'\bstopp\b',
|
|
r'das\s*stimmt\s*nicht', r'ich\s*meinte\s*eigentlich', r'❌', r'👎',
|
|
]
|
|
|
|
CORRECTION_PATTERNS = [
|
|
r'nein,?\s*ich\s*meinte',
|
|
r'nicht\s*so,?\s*sondern',
|
|
r'das\s*ist\s*falsch',
|
|
r'eigentlich\s*wollte\s*ich',
|
|
]
|
|
|
|
# Compile patterns for efficiency
|
|
POSITIVE_RE = [re.compile(p, re.IGNORECASE) for p in POSITIVE_PATTERNS]
|
|
NEGATIVE_RE = [re.compile(p, re.IGNORECASE) for p in NEGATIVE_PATTERNS]
|
|
CORRECTION_RE = [re.compile(p, re.IGNORECASE) for p in CORRECTION_PATTERNS]
|
|
|
|
|
|
# --- Helper Functions ---
|
|
|
|
def parse_duration(duration_str: str) -> timedelta:
|
|
"""Parse duration string like '24h', '7d', '30m' to timedelta."""
|
|
match = re.match(r'^(\d+)([hdm])$', duration_str.lower())
|
|
if not match:
|
|
raise ValueError(f"Invalid duration format: {duration_str}. Use like '24h', '7d', '30m'")
|
|
|
|
value = int(match.group(1))
|
|
unit = match.group(2)
|
|
|
|
if unit == 'h':
|
|
return timedelta(hours=value)
|
|
elif unit == 'd':
|
|
return timedelta(days=value)
|
|
elif unit == 'm':
|
|
return timedelta(minutes=value)
|
|
else:
|
|
raise ValueError(f"Unknown unit: {unit}")
|
|
|
|
|
|
def load_json(path: Path, default: Any = None) -> Any:
|
|
"""Load JSON file, returning default if not found."""
|
|
if path.exists():
|
|
return json.loads(path.read_text())
|
|
return default if default is not None else {}
|
|
|
|
|
|
def save_json(path: Path, data: Any) -> None:
|
|
"""Save data as JSON file."""
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(data, indent=2, ensure_ascii=False))
|
|
|
|
|
|
def extract_text(event: dict) -> str:
|
|
"""Extract text content from various event formats."""
|
|
# Try different paths where text might be
|
|
if isinstance(event.get('payload'), dict):
|
|
payload = event['payload']
|
|
if 'data' in payload and isinstance(payload['data'], dict):
|
|
if 'text' in payload['data']:
|
|
return payload['data']['text']
|
|
if 'content' in payload:
|
|
return payload['content']
|
|
if 'text_preview' in payload:
|
|
previews = payload['text_preview']
|
|
if isinstance(previews, list):
|
|
return ' '.join(p.get('text', '') for p in previews if isinstance(p, dict))
|
|
|
|
if 'data' in event and isinstance(event['data'], dict):
|
|
if 'text' in event['data']:
|
|
return event['data']['text']
|
|
|
|
return ''
|
|
|
|
|
|
# --- Signal Extraction ---
|
|
|
|
def extract_signals(events: list[dict]) -> dict:
|
|
"""Extract preference signals from events."""
|
|
signals = {
|
|
'positive': [],
|
|
'negative': [],
|
|
'corrections': [],
|
|
'topics': {}, # topic -> count
|
|
'skill_gaps': {}, # domain -> count
|
|
'task_outcomes': [],
|
|
}
|
|
|
|
for event in events:
|
|
text = extract_text(event)
|
|
if not text:
|
|
continue
|
|
|
|
timestamp = event.get('timestamp', event.get('timestampMs', 0))
|
|
|
|
# Check for positive signals
|
|
for pattern in POSITIVE_RE:
|
|
if pattern.search(text):
|
|
signals['positive'].append({
|
|
'timestamp': timestamp,
|
|
'text': text[:200],
|
|
'pattern': pattern.pattern,
|
|
})
|
|
break
|
|
|
|
# Check for negative signals
|
|
for pattern in NEGATIVE_RE:
|
|
if pattern.search(text):
|
|
signals['negative'].append({
|
|
'timestamp': timestamp,
|
|
'text': text[:200],
|
|
'pattern': pattern.pattern,
|
|
})
|
|
break
|
|
|
|
# Check for corrections
|
|
for pattern in CORRECTION_RE:
|
|
if pattern.search(text):
|
|
signals['corrections'].append({
|
|
'timestamp': timestamp,
|
|
'text': text[:200],
|
|
})
|
|
break
|
|
|
|
# Extract topics (capitalized words/phrases)
|
|
topic_matches = re.findall(r'[A-Z][a-zäöü]+(?:\s+[A-Z][a-zäöü]+)*', text)
|
|
for topic in topic_matches:
|
|
if len(topic) > 3 and topic.lower() not in STOPWORDS:
|
|
signals['topics'][topic] = signals['topics'].get(topic, 0) + 1
|
|
|
|
# Detect skill gaps (errors, retries)
|
|
if re.search(r'error|fehler|failed|nicht gefunden|module not found', text, re.IGNORECASE):
|
|
domains = re.findall(r'TypeScript|NATS|Matrix|Python|Node|npm|git', text, re.IGNORECASE)
|
|
for domain in (domains or ['unknown']):
|
|
d = domain.lower()
|
|
signals['skill_gaps'][d] = signals['skill_gaps'].get(d, 0) + 1
|
|
|
|
# Track task outcomes (lifecycle events)
|
|
event_type = event.get('type', '')
|
|
if 'lifecycle' in event_type:
|
|
phase = event.get('payload', {}).get('data', {}).get('phase', '')
|
|
if phase in ('end', 'error'):
|
|
signals['task_outcomes'].append({
|
|
'timestamp': timestamp,
|
|
'success': phase == 'end',
|
|
'session': event.get('session', ''),
|
|
})
|
|
|
|
return signals
|
|
|
|
|
|
def derive_preferences(signals: dict, existing: dict) -> dict:
|
|
"""Derive preferences from extracted signals."""
|
|
prefs = {**existing}
|
|
|
|
positive_count = len(signals['positive'])
|
|
negative_count = len(signals['negative'])
|
|
correction_count = len(signals['corrections'])
|
|
|
|
prefs['lastAnalysis'] = datetime.now().isoformat()
|
|
prefs['signalCounts'] = {
|
|
'positive': positive_count,
|
|
'negative': negative_count,
|
|
'corrections': correction_count,
|
|
}
|
|
|
|
# Derive insights
|
|
prefs['derived'] = prefs.get('derived', {})
|
|
|
|
if correction_count > 5:
|
|
prefs['derived']['needsMoreClarification'] = True
|
|
|
|
if positive_count > negative_count * 2:
|
|
prefs['derived']['approachWorking'] = True
|
|
|
|
# Top topics
|
|
top_topics = sorted(signals['topics'].items(), key=lambda x: -x[1])[:10]
|
|
prefs['topTopics'] = [{'topic': t, 'count': c} for t, c in top_topics]
|
|
|
|
# Explicit preferences (defaults)
|
|
prefs['explicit'] = {
|
|
'responseLength': 'concise',
|
|
'technicalDepth': 'high',
|
|
'humor': 'appreciated',
|
|
'tablesInTelegram': 'never',
|
|
'proactiveSuggestions': 'always',
|
|
'fleiß': 'mandatory',
|
|
'ehrgeiz': 'mandatory',
|
|
'biss': 'mandatory',
|
|
}
|
|
|
|
return prefs
|
|
|
|
|
|
def derive_behaviors(signals: dict, existing: dict) -> dict:
|
|
"""Derive behavior patterns from signals."""
|
|
behaviors = {**existing}
|
|
behaviors['lastAnalysis'] = datetime.now().isoformat()
|
|
|
|
behaviors['successPatterns'] = behaviors.get('successPatterns', [])
|
|
behaviors['failurePatterns'] = behaviors.get('failurePatterns', [])
|
|
|
|
# Recent positive signals indicate successful patterns
|
|
for pos in signals['positive'][-10:]:
|
|
behaviors['successPatterns'].append({
|
|
'timestamp': pos['timestamp'],
|
|
'context': pos['text'][:100],
|
|
})
|
|
|
|
# Keep only last 50 patterns
|
|
behaviors['successPatterns'] = behaviors['successPatterns'][-50:]
|
|
behaviors['failurePatterns'] = behaviors['failurePatterns'][-50:]
|
|
|
|
return behaviors
|
|
|
|
|
|
def generate_learning_context(prefs: dict, behaviors: dict, topics: dict, skill_gaps: dict) -> str:
|
|
"""Generate LEARNING_CONTEXT.md content."""
|
|
lines = [
|
|
'# Learned Context',
|
|
'',
|
|
f'*Auto-generated by Learning Processor at {datetime.now().isoformat()}*',
|
|
'',
|
|
'## Core Values (Fleiß, Ehrgeiz, Biss)',
|
|
'',
|
|
'- **Proaktiv handeln** — Nicht warten, sondern vorangehen',
|
|
'- **Bei Fehlern:** 3 Alternativen probieren, dann nochmal 3, dann erst fragen',
|
|
'- **Enttäuschung als Treibstoff** — Doppelter Fleiß bis es funktioniert',
|
|
'',
|
|
'## Learned Preferences',
|
|
'',
|
|
]
|
|
|
|
if prefs.get('explicit'):
|
|
for key, value in prefs['explicit'].items():
|
|
lines.append(f'- **{key}:** {value}')
|
|
|
|
lines.extend(['', '## Active Topics', ''])
|
|
|
|
real_topics = [t for t in prefs.get('topTopics', [])
|
|
if t['topic'].lower() not in STOPWORDS][:8]
|
|
|
|
for t in real_topics:
|
|
lines.append(f"- {t['topic']} ({t['count']} mentions)")
|
|
|
|
if skill_gaps.get('priority'):
|
|
lines.extend(['', '## Areas to Improve', ''])
|
|
for gap in skill_gaps['priority']:
|
|
lines.append(f"- **{gap['area']}:** {gap['frequency']} recent errors — study this!")
|
|
|
|
lines.extend(['', '## Signal Summary', ''])
|
|
lines.append(f"- Positive signals: {prefs.get('signalCounts', {}).get('positive', 0)}")
|
|
lines.append(f"- Corrections needed: {prefs.get('signalCounts', {}).get('corrections', 0)}")
|
|
|
|
return '\n'.join(lines)
|
|
|
|
|
|
# --- NATS Event Fetching ---
|
|
|
|
async def fetch_nats_events(nats_url: str, hours: int = 24) -> list[dict]:
|
|
"""Fetch recent events from NATS JetStream."""
|
|
if not HAS_NATS:
|
|
print("❌ nats-py not installed. Use: pip install nats-py", file=sys.stderr)
|
|
return []
|
|
|
|
events = []
|
|
cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)
|
|
|
|
# Parse credentials from URL or environment
|
|
user = os.environ.get('NATS_USER', 'claudia')
|
|
password = os.environ.get('NATS_PASSWORD', '')
|
|
|
|
try:
|
|
nc = await nats.connect(nats_url, user=user, password=password)
|
|
js = nc.jetstream()
|
|
|
|
# Get stream info
|
|
try:
|
|
stream = await js.stream_info('openclaw-events')
|
|
last_seq = stream.state.last_seq
|
|
start_seq = max(1, last_seq - 500) # Last 500 events
|
|
|
|
# Fetch messages by sequence
|
|
for seq in range(start_seq, last_seq + 1):
|
|
try:
|
|
msg = await js.get_msg('openclaw-events', seq)
|
|
event = json.loads(msg.data.decode())
|
|
if event.get('timestamp', 0) >= cutoff:
|
|
events.append(event)
|
|
except Exception:
|
|
continue
|
|
except Exception as e:
|
|
print(f"❌ Error reading stream: {e}", file=sys.stderr)
|
|
|
|
await nc.drain()
|
|
except Exception as e:
|
|
print(f"❌ NATS connection failed: {e}", file=sys.stderr)
|
|
|
|
return events
|
|
|
|
|
|
def load_jsonl_events(source: str, hours: int = 24) -> list[dict]:
|
|
"""Load events from JSONL file or stdin."""
|
|
events = []
|
|
cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)
|
|
|
|
if source == '-':
|
|
lines = sys.stdin
|
|
else:
|
|
path = Path(source)
|
|
if not path.exists():
|
|
print(f"❌ File not found: {source}", file=sys.stderr)
|
|
return []
|
|
lines = path.open()
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
event = json.loads(line)
|
|
# Filter by time if timestamp present
|
|
ts = event.get('timestamp', event.get('timestampMs', 0))
|
|
if ts == 0 or ts >= cutoff:
|
|
events.append(event)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if source != '-':
|
|
lines.close()
|
|
|
|
return events
|
|
|
|
|
|
# --- Main ---
|
|
|
|
def run_learning_cycle(events: list[dict], agent: str = 'main') -> dict:
|
|
"""Run the learning cycle on provided events."""
|
|
ldir = learning_dir() / agent
|
|
ldir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# File paths
|
|
prefs_file = ldir / 'preferences.json'
|
|
behaviors_file = ldir / 'behaviors.json'
|
|
topics_file = ldir / 'topics.json'
|
|
skill_gaps_file = ldir / 'skill-gaps.json'
|
|
context_file = ldir / 'learning-context.md'
|
|
|
|
print(f'🧠 Learning Processor', file=sys.stderr)
|
|
print(f' Events: {len(events)}', file=sys.stderr)
|
|
|
|
# Extract signals
|
|
signals = extract_signals(events)
|
|
print(f' Positive: {len(signals["positive"])}', file=sys.stderr)
|
|
print(f' Negative: {len(signals["negative"])}', file=sys.stderr)
|
|
print(f' Corrections: {len(signals["corrections"])}', file=sys.stderr)
|
|
print(f' Topics: {len(signals["topics"])}', file=sys.stderr)
|
|
|
|
# Load existing data
|
|
existing_prefs = load_json(prefs_file, {})
|
|
existing_behaviors = load_json(behaviors_file, {})
|
|
|
|
# Derive new preferences and behaviors
|
|
new_prefs = derive_preferences(signals, existing_prefs)
|
|
new_behaviors = derive_behaviors(signals, existing_behaviors)
|
|
|
|
# Save all files
|
|
save_json(prefs_file, new_prefs)
|
|
save_json(behaviors_file, new_behaviors)
|
|
|
|
topics_data = {
|
|
'lastAnalysis': datetime.now().isoformat(),
|
|
'topics': signals['topics'],
|
|
}
|
|
save_json(topics_file, topics_data)
|
|
|
|
skill_gaps_data = {
|
|
'lastAnalysis': datetime.now().isoformat(),
|
|
'gaps': signals['skill_gaps'],
|
|
'priority': sorted(
|
|
[{'area': k, 'frequency': v} for k, v in signals['skill_gaps'].items()],
|
|
key=lambda x: -x['frequency']
|
|
)[:5],
|
|
}
|
|
save_json(skill_gaps_file, skill_gaps_data)
|
|
|
|
# Generate context markdown
|
|
context_md = generate_learning_context(new_prefs, new_behaviors, topics_data, skill_gaps_data)
|
|
context_file.write_text(context_md)
|
|
|
|
print(f'\n✅ Learning cycle complete!', file=sys.stderr)
|
|
print(f' Output: {ldir}', file=sys.stderr)
|
|
|
|
# Print summary
|
|
if new_prefs.get('topTopics'):
|
|
print('\n📈 Top Topics:', file=sys.stderr)
|
|
for t in new_prefs['topTopics'][:5]:
|
|
print(f" - {t['topic']}: {t['count']}", file=sys.stderr)
|
|
|
|
if skill_gaps_data['priority']:
|
|
print('\n⚠️ Skill Gaps:', file=sys.stderr)
|
|
for gap in skill_gaps_data['priority']:
|
|
print(f" - {gap['area']}: {gap['frequency']} errors", file=sys.stderr)
|
|
|
|
return {
|
|
'preferences': new_prefs,
|
|
'behaviors': new_behaviors,
|
|
'topics': topics_data,
|
|
'skill_gaps': skill_gaps_data,
|
|
}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Extract preferences and patterns from NATS events',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog='''
|
|
Examples:
|
|
cortex learn --since 24h
|
|
cortex learn --since 7d --nats nats://localhost:4222
|
|
cortex learn --jsonl events.jsonl
|
|
cat events.jsonl | cortex learn --jsonl -
|
|
'''
|
|
)
|
|
parser.add_argument('--since', default='24h',
|
|
help='Time window (e.g. 24h, 7d, 30m)')
|
|
parser.add_argument('--nats', default='nats://localhost:4222',
|
|
help='NATS server URL')
|
|
parser.add_argument('--jsonl', metavar='FILE',
|
|
help='Load events from JSONL file (use - for stdin)')
|
|
parser.add_argument('--agent', default='main',
|
|
help='Agent name for output directory')
|
|
parser.add_argument('--json', action='store_true',
|
|
help='Output results as JSON')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Parse duration
|
|
try:
|
|
duration = parse_duration(args.since)
|
|
hours = duration.total_seconds() / 3600
|
|
except ValueError as e:
|
|
print(f"❌ {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Fetch events
|
|
if args.jsonl:
|
|
events = load_jsonl_events(args.jsonl, int(hours))
|
|
else:
|
|
import asyncio
|
|
events = asyncio.run(fetch_nats_events(args.nats, int(hours)))
|
|
|
|
if not events:
|
|
print("⚠️ No events found", file=sys.stderr)
|
|
sys.exit(0)
|
|
|
|
# Run learning cycle
|
|
results = run_learning_cycle(events, args.agent)
|
|
|
|
if args.json:
|
|
print(json.dumps(results, indent=2, ensure_ascii=False))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|