darkplex-core/cortex/learn.py
Claudia 0123ec7090
All checks were successful
Tests / test (push) Successful in 3s
fix: format specifier crash when stream_info is None
2026-02-09 12:51:56 +01:00

528 lines
18 KiB
Python

#!/usr/bin/env python3
"""Cortex Learn — Extract preferences and behavior patterns from NATS events.
Ported from ~/clawd/scripts/learning-processor.mjs
Usage:
cortex learn --since 24h [--nats nats://localhost:4222]
cortex learn --jsonl input.jsonl
cat events.jsonl | cortex learn --jsonl -
"""
import argparse
import hashlib
import json
import os
import re
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Optional
from cortex.config import cortex_home
# Try to import nats (optional dependency)
try:
import nats
from nats.js.api import StreamSource
HAS_NATS = True
except ImportError:
HAS_NATS = False
# --- Directories ---
def learning_dir() -> Path:
"""Directory for learning output files."""
return cortex_home() / "learning"
# --- Constants ---
# German + English stopwords to filter from topics
STOPWORDS = {
# German
'nicht', 'dass', 'wenn', 'dann', 'aber', 'oder', 'und', 'der', 'die', 'das',
'ein', 'eine', 'einer', 'ist', 'sind', 'war', 'waren', 'wird', 'werden',
'hat', 'haben', 'kann', 'können', 'muss', 'müssen', 'soll', 'sollte',
'wird', 'würde', 'hier', 'dort', 'jetzt', 'noch', 'schon', 'auch', 'nur',
'sehr', 'mehr', 'viel', 'ganz', 'alle', 'allem', 'allen', 'aller', 'alles',
'heute', 'morgen', 'gestern', 'immer', 'wieder', 'also', 'dabei', 'damit',
'danach', 'darin', 'darum', 'davon', 'dazu', 'denn', 'doch', 'durch',
# English
'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'had',
'her', 'was', 'one', 'our', 'out', 'has', 'have', 'been', 'would', 'could',
'this', 'that', 'with', 'from', 'what', 'which', 'their', 'will', 'there',
}
# Signal patterns for preference extraction
POSITIVE_PATTERNS = [
r'\bmega\b', r'\bsuper\b', r'\bperfekt\b', r'\bgenau\s*so\b', r'\bja\s*!',
r'\bcool\b', r'gut\s*gemacht', r'das\s*war\s*gut', r'gefällt\s*mir',
r'\bklasse\b', r'\btoll\b', r'👍', r'🎉', r'💪', r'❤️', r'😘', r'🚀', r'', r'💯',
]
NEGATIVE_PATTERNS = [
r'\bnein,\s*ich', r'nicht\s*so,', r'\bfalsch\b', r'\bstopp\b',
r'das\s*stimmt\s*nicht', r'ich\s*meinte\s*eigentlich', r'', r'👎',
]
CORRECTION_PATTERNS = [
r'nein,?\s*ich\s*meinte',
r'nicht\s*so,?\s*sondern',
r'das\s*ist\s*falsch',
r'eigentlich\s*wollte\s*ich',
]
# Compile patterns for efficiency
POSITIVE_RE = [re.compile(p, re.IGNORECASE) for p in POSITIVE_PATTERNS]
NEGATIVE_RE = [re.compile(p, re.IGNORECASE) for p in NEGATIVE_PATTERNS]
CORRECTION_RE = [re.compile(p, re.IGNORECASE) for p in CORRECTION_PATTERNS]
# --- Helper Functions ---
def parse_duration(duration_str: str) -> timedelta:
"""Parse duration string like '24h', '7d', '30m' to timedelta."""
match = re.match(r'^(\d+)([hdm])$', duration_str.lower())
if not match:
raise ValueError(f"Invalid duration format: {duration_str}. Use like '24h', '7d', '30m'")
value = int(match.group(1))
unit = match.group(2)
if unit == 'h':
return timedelta(hours=value)
elif unit == 'd':
return timedelta(days=value)
elif unit == 'm':
return timedelta(minutes=value)
else:
raise ValueError(f"Unknown unit: {unit}")
def load_json(path: Path, default: Any = None) -> Any:
"""Load JSON file, returning default if not found."""
if path.exists():
return json.loads(path.read_text())
return default if default is not None else {}
def save_json(path: Path, data: Any) -> None:
"""Save data as JSON file."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2, ensure_ascii=False))
def extract_text(event: dict) -> str:
"""Extract text content from various event formats."""
# Try different paths where text might be
if isinstance(event.get('payload'), dict):
payload = event['payload']
if 'data' in payload and isinstance(payload['data'], dict):
if 'text' in payload['data']:
return payload['data']['text']
if 'content' in payload:
return payload['content']
if 'text_preview' in payload:
previews = payload['text_preview']
if isinstance(previews, list):
return ' '.join(p.get('text', '') for p in previews if isinstance(p, dict))
if 'data' in event and isinstance(event['data'], dict):
if 'text' in event['data']:
return event['data']['text']
return ''
# --- Signal Extraction ---
def extract_signals(events: list[dict]) -> dict:
"""Extract preference signals from events."""
signals = {
'positive': [],
'negative': [],
'corrections': [],
'topics': {}, # topic -> count
'skill_gaps': {}, # domain -> count
'task_outcomes': [],
}
for event in events:
text = extract_text(event)
if not text:
continue
timestamp = event.get('timestamp', event.get('timestampMs', 0))
# Check for positive signals
for pattern in POSITIVE_RE:
if pattern.search(text):
signals['positive'].append({
'timestamp': timestamp,
'text': text[:200],
'pattern': pattern.pattern,
})
break
# Check for negative signals
for pattern in NEGATIVE_RE:
if pattern.search(text):
signals['negative'].append({
'timestamp': timestamp,
'text': text[:200],
'pattern': pattern.pattern,
})
break
# Check for corrections
for pattern in CORRECTION_RE:
if pattern.search(text):
signals['corrections'].append({
'timestamp': timestamp,
'text': text[:200],
})
break
# Extract topics (capitalized words/phrases)
topic_matches = re.findall(r'[A-Z][a-zäöü]+(?:\s+[A-Z][a-zäöü]+)*', text)
for topic in topic_matches:
if len(topic) > 3 and topic.lower() not in STOPWORDS:
signals['topics'][topic] = signals['topics'].get(topic, 0) + 1
# Detect skill gaps (errors, retries)
if re.search(r'error|fehler|failed|nicht gefunden|module not found', text, re.IGNORECASE):
domains = re.findall(r'TypeScript|NATS|Matrix|Python|Node|npm|git', text, re.IGNORECASE)
for domain in (domains or ['unknown']):
d = domain.lower()
signals['skill_gaps'][d] = signals['skill_gaps'].get(d, 0) + 1
# Track task outcomes (lifecycle events)
event_type = event.get('type', '')
if 'lifecycle' in event_type:
phase = event.get('payload', {}).get('data', {}).get('phase', '')
if phase in ('end', 'error'):
signals['task_outcomes'].append({
'timestamp': timestamp,
'success': phase == 'end',
'session': event.get('session', ''),
})
return signals
def derive_preferences(signals: dict, existing: dict) -> dict:
"""Derive preferences from extracted signals."""
prefs = {**existing}
positive_count = len(signals['positive'])
negative_count = len(signals['negative'])
correction_count = len(signals['corrections'])
prefs['lastAnalysis'] = datetime.now().isoformat()
prefs['signalCounts'] = {
'positive': positive_count,
'negative': negative_count,
'corrections': correction_count,
}
# Derive insights
prefs['derived'] = prefs.get('derived', {})
if correction_count > 5:
prefs['derived']['needsMoreClarification'] = True
if positive_count > negative_count * 2:
prefs['derived']['approachWorking'] = True
# Top topics
top_topics = sorted(signals['topics'].items(), key=lambda x: -x[1])[:10]
prefs['topTopics'] = [{'topic': t, 'count': c} for t, c in top_topics]
# Explicit preferences (defaults)
prefs['explicit'] = {
'responseLength': 'concise',
'technicalDepth': 'high',
'humor': 'appreciated',
'tablesInTelegram': 'never',
'proactiveSuggestions': 'always',
'fleiß': 'mandatory',
'ehrgeiz': 'mandatory',
'biss': 'mandatory',
}
return prefs
def derive_behaviors(signals: dict, existing: dict) -> dict:
"""Derive behavior patterns from signals."""
behaviors = {**existing}
behaviors['lastAnalysis'] = datetime.now().isoformat()
behaviors['successPatterns'] = behaviors.get('successPatterns', [])
behaviors['failurePatterns'] = behaviors.get('failurePatterns', [])
# Recent positive signals indicate successful patterns
for pos in signals['positive'][-10:]:
behaviors['successPatterns'].append({
'timestamp': pos['timestamp'],
'context': pos['text'][:100],
})
# Keep only last 50 patterns
behaviors['successPatterns'] = behaviors['successPatterns'][-50:]
behaviors['failurePatterns'] = behaviors['failurePatterns'][-50:]
return behaviors
def generate_learning_context(prefs: dict, behaviors: dict, topics: dict, skill_gaps: dict) -> str:
"""Generate LEARNING_CONTEXT.md content."""
lines = [
'# Learned Context',
'',
f'*Auto-generated by Learning Processor at {datetime.now().isoformat()}*',
'',
'## Core Values (Fleiß, Ehrgeiz, Biss)',
'',
'- **Proaktiv handeln** — Nicht warten, sondern vorangehen',
'- **Bei Fehlern:** 3 Alternativen probieren, dann nochmal 3, dann erst fragen',
'- **Enttäuschung als Treibstoff** — Doppelter Fleiß bis es funktioniert',
'',
'## Learned Preferences',
'',
]
if prefs.get('explicit'):
for key, value in prefs['explicit'].items():
lines.append(f'- **{key}:** {value}')
lines.extend(['', '## Active Topics', ''])
real_topics = [t for t in prefs.get('topTopics', [])
if t['topic'].lower() not in STOPWORDS][:8]
for t in real_topics:
lines.append(f"- {t['topic']} ({t['count']} mentions)")
if skill_gaps.get('priority'):
lines.extend(['', '## Areas to Improve', ''])
for gap in skill_gaps['priority']:
lines.append(f"- **{gap['area']}:** {gap['frequency']} recent errors — study this!")
lines.extend(['', '## Signal Summary', ''])
lines.append(f"- Positive signals: {prefs.get('signalCounts', {}).get('positive', 0)}")
lines.append(f"- Corrections needed: {prefs.get('signalCounts', {}).get('corrections', 0)}")
return '\n'.join(lines)
# --- NATS Event Fetching ---
async def fetch_nats_events(nats_url: str, hours: int = 24) -> list[dict]:
"""Fetch recent events from NATS JetStream."""
if not HAS_NATS:
print("❌ nats-py not installed. Use: pip install nats-py", file=sys.stderr)
return []
events = []
cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)
# Parse credentials from URL or environment
user = os.environ.get('NATS_USER', 'claudia')
password = os.environ.get('NATS_PASSWORD', '')
try:
nc = await nats.connect(nats_url, user=user, password=password)
js = nc.jetstream()
# Get stream info
try:
stream = await js.stream_info('openclaw-events')
last_seq = stream.state.last_seq
start_seq = max(1, last_seq - 500) # Last 500 events
# Fetch messages by sequence
for seq in range(start_seq, last_seq + 1):
try:
msg = await js.get_msg('openclaw-events', seq)
event = json.loads(msg.data.decode())
if event.get('timestamp', 0) >= cutoff:
events.append(event)
except Exception:
continue
except Exception as e:
print(f"❌ Error reading stream: {e}", file=sys.stderr)
await nc.drain()
except Exception as e:
print(f"❌ NATS connection failed: {e}", file=sys.stderr)
return events
def load_jsonl_events(source: str, hours: int = 24) -> list[dict]:
"""Load events from JSONL file or stdin."""
events = []
cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)
if source == '-':
lines = sys.stdin
else:
path = Path(source)
if not path.exists():
print(f"❌ File not found: {source}", file=sys.stderr)
return []
lines = path.open()
for line in lines:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
# Filter by time if timestamp present
ts = event.get('timestamp', event.get('timestampMs', 0))
if ts == 0 or ts >= cutoff:
events.append(event)
except json.JSONDecodeError:
continue
if source != '-':
lines.close()
return events
# --- Main ---
def run_learning_cycle(events: list[dict], agent: str = 'main') -> dict:
"""Run the learning cycle on provided events."""
ldir = learning_dir() / agent
ldir.mkdir(parents=True, exist_ok=True)
# File paths
prefs_file = ldir / 'preferences.json'
behaviors_file = ldir / 'behaviors.json'
topics_file = ldir / 'topics.json'
skill_gaps_file = ldir / 'skill-gaps.json'
context_file = ldir / 'learning-context.md'
print(f'🧠 Learning Processor', file=sys.stderr)
print(f' Events: {len(events)}', file=sys.stderr)
# Extract signals
signals = extract_signals(events)
print(f' Positive: {len(signals["positive"])}', file=sys.stderr)
print(f' Negative: {len(signals["negative"])}', file=sys.stderr)
print(f' Corrections: {len(signals["corrections"])}', file=sys.stderr)
print(f' Topics: {len(signals["topics"])}', file=sys.stderr)
# Load existing data
existing_prefs = load_json(prefs_file, {})
existing_behaviors = load_json(behaviors_file, {})
# Derive new preferences and behaviors
new_prefs = derive_preferences(signals, existing_prefs)
new_behaviors = derive_behaviors(signals, existing_behaviors)
# Save all files
save_json(prefs_file, new_prefs)
save_json(behaviors_file, new_behaviors)
topics_data = {
'lastAnalysis': datetime.now().isoformat(),
'topics': signals['topics'],
}
save_json(topics_file, topics_data)
skill_gaps_data = {
'lastAnalysis': datetime.now().isoformat(),
'gaps': signals['skill_gaps'],
'priority': sorted(
[{'area': k, 'frequency': v} for k, v in signals['skill_gaps'].items()],
key=lambda x: -x['frequency']
)[:5],
}
save_json(skill_gaps_file, skill_gaps_data)
# Generate context markdown
context_md = generate_learning_context(new_prefs, new_behaviors, topics_data, skill_gaps_data)
context_file.write_text(context_md)
print(f'\n✅ Learning cycle complete!', file=sys.stderr)
print(f' Output: {ldir}', file=sys.stderr)
# Print summary
if new_prefs.get('topTopics'):
print('\n📈 Top Topics:', file=sys.stderr)
for t in new_prefs['topTopics'][:5]:
print(f" - {t['topic']}: {t['count']}", file=sys.stderr)
if skill_gaps_data['priority']:
print('\n⚠️ Skill Gaps:', file=sys.stderr)
for gap in skill_gaps_data['priority']:
print(f" - {gap['area']}: {gap['frequency']} errors", file=sys.stderr)
return {
'preferences': new_prefs,
'behaviors': new_behaviors,
'topics': topics_data,
'skill_gaps': skill_gaps_data,
}
def main():
parser = argparse.ArgumentParser(
description='Extract preferences and patterns from NATS events',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
cortex learn --since 24h
cortex learn --since 7d --nats nats://localhost:4222
cortex learn --jsonl events.jsonl
cat events.jsonl | cortex learn --jsonl -
'''
)
parser.add_argument('--since', default='24h',
help='Time window (e.g. 24h, 7d, 30m)')
parser.add_argument('--nats', default='nats://localhost:4222',
help='NATS server URL')
parser.add_argument('--jsonl', metavar='FILE',
help='Load events from JSONL file (use - for stdin)')
parser.add_argument('--agent', default='main',
help='Agent name for output directory')
parser.add_argument('--json', action='store_true',
help='Output results as JSON')
args = parser.parse_args()
# Parse duration
try:
duration = parse_duration(args.since)
hours = duration.total_seconds() / 3600
except ValueError as e:
print(f"{e}", file=sys.stderr)
sys.exit(1)
# Fetch events
if args.jsonl:
events = load_jsonl_events(args.jsonl, int(hours))
else:
import asyncio
events = asyncio.run(fetch_nats_events(args.nats, int(hours)))
if not events:
print("⚠️ No events found", file=sys.stderr)
sys.exit(0)
# Run learning cycle
results = run_learning_cycle(events, args.agent)
if args.json:
print(json.dumps(results, indent=2, ensure_ascii=False))
if __name__ == '__main__':
main()