darkplex-core/cortex/context.py
Claudia 0123ec7090
All checks were successful
Tests / test (push) Successful in 3s
fix: format specifier crash when stream_info is None
2026-02-09 12:51:56 +01:00

433 lines
15 KiB
Python

#!/usr/bin/env python3
"""Cortex Context — Generate LEARNING_CONTEXT.md from NATS events.
Ported from ~/clawd/scripts/context-generator.mjs
Usage:
cortex context [--events 2000] [--output context.md]
cortex context --jsonl events.jsonl
"""
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
from cortex.config import cortex_home
# --- Configuration ---
def learning_dir() -> Path:
"""Directory for learning output files."""
return cortex_home() / "learning"
# Topic keywords for classification
TOPIC_KEYWORDS = {
'coding': ['code', 'script', 'function', 'bug', 'error', 'python', 'javascript', 'node', 'npm'],
'mondo-gate': ['mondo', 'mygate', 'fintech', 'payment', 'crypto', 'bitcoin', 'token', 'wallet'],
'infrastructure': ['server', 'docker', 'systemd', 'service', 'nats', 'api', 'deploy', 'port'],
'agents': ['agent', 'mona', 'vera', 'stella', 'viola', 'sub-agent', 'spawn', 'session'],
'security': ['security', 'audit', 'vulnerability', 'password', 'credential', 'auth', 'vault'],
'memory': ['memory', 'context', 'event', 'learning', 'brain', 'recall', 'remember'],
'communication': ['matrix', 'telegram', 'email', 'message', 'chat', 'voice'],
'business': ['meeting', 'pitch', 'investor', 'series', 'valuation', 'partner', 'strategy'],
'files': ['file', 'read', 'write', 'edit', 'create', 'delete', 'directory'],
}
# --- NATS CLI Helpers ---
def get_nats_bin() -> str:
"""Find nats binary."""
# Try common locations
for path in [
os.path.expanduser('~/bin/nats'),
'/usr/local/bin/nats',
'/usr/bin/nats',
]:
if os.path.exists(path):
return path
# Fall back to PATH
return 'nats'
def get_stream_info() -> Optional[dict]:
"""Get NATS stream info."""
nats_bin = get_nats_bin()
try:
result = subprocess.run(
[nats_bin, 'stream', 'info', 'openclaw-events', '--json'],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
info = json.loads(result.stdout)
return {
'messages': info.get('state', {}).get('messages', 0),
'first_seq': info.get('state', {}).get('first_seq', 1),
'last_seq': info.get('state', {}).get('last_seq', 0),
}
except Exception as e:
print(f"❌ Error getting stream info: {e}", file=sys.stderr)
return None
def fetch_nats_events(start_seq: int, end_seq: int) -> list[dict]:
"""Fetch events from NATS by sequence range."""
nats_bin = get_nats_bin()
events = []
for seq in range(start_seq, end_seq + 1):
try:
result = subprocess.run(
[nats_bin, 'stream', 'get', 'openclaw-events', str(seq), '--json'],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
msg = json.loads(result.stdout)
# Decode base64 data
import base64
data_b64 = msg.get('data', '')
data = json.loads(base64.b64decode(data_b64).decode('utf-8'))
events.append({
'seq': msg.get('seq'),
'subject': msg.get('subject'),
'time': msg.get('time'),
**data
})
except Exception:
continue
# Progress indicator
if len(events) % 200 == 0 and len(events) > 0:
print(f"\r Fetched {len(events)} events...", end='', file=sys.stderr)
print(file=sys.stderr)
return events
def load_jsonl_events(source: str) -> list[dict]:
"""Load events from JSONL file or stdin."""
events = []
if source == '-':
lines = sys.stdin
else:
path = Path(source)
if not path.exists():
print(f"❌ File not found: {source}", file=sys.stderr)
return []
lines = path.open()
for line in lines:
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
if source != '-':
lines.close()
return events
# --- Analysis ---
def extract_text(event: dict) -> str:
"""Extract text content from various event formats."""
# Try text_preview
if event.get('payload', {}).get('text_preview'):
previews = event['payload']['text_preview']
if isinstance(previews, list):
return ' '.join(p.get('text', '') for p in previews if isinstance(p, dict) and p.get('type') == 'text')
# Try payload.data.text
if event.get('payload', {}).get('data', {}).get('text'):
return event['payload']['data']['text']
# Try payload.message
if event.get('payload', {}).get('message'):
return event['payload']['message']
# Try data.text
if event.get('data', {}).get('text'):
return event['data']['text']
return ''
def analyze_events(events: list[dict]) -> dict:
"""Extract patterns from events."""
patterns = {
'languages': {'de': 0, 'en': 0},
'topics': {},
'tools': {},
'time_of_day': {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0},
'message_types': {},
'agents': {},
'sessions': set(),
'user_messages': [],
'assistant_messages': [],
}
for event in events:
text = extract_text(event)
text_lower = text.lower()
# Message type tracking
msg_type = event.get('type', 'unknown')
patterns['message_types'][msg_type] = patterns['message_types'].get(msg_type, 0) + 1
# Agent tracking
if event.get('agent'):
agent = event['agent']
patterns['agents'][agent] = patterns['agents'].get(agent, 0) + 1
# Session tracking
if event.get('session'):
patterns['sessions'].add(event['session'])
# Track user vs assistant messages
if 'message_in' in msg_type or 'message.in' in msg_type:
if len(text) > 5:
patterns['user_messages'].append(text[:200])
if 'message_out' in msg_type or 'message.out' in msg_type:
if len(text) > 20:
snippet = text[:200]
if snippet not in patterns['assistant_messages']:
patterns['assistant_messages'].append(snippet)
# Tool usage from tool events
if 'tool' in msg_type and event.get('payload', {}).get('data', {}).get('name'):
tool_name = event['payload']['data']['name']
patterns['tools'][tool_name] = patterns['tools'].get(tool_name, 0) + 1
# Skip if no text for language/topic analysis
if len(text) < 5:
continue
# Language detection (simple heuristic)
german_pattern = r'[äöüß]|(?:^|\s)(ich|du|wir|das|ist|und|oder|nicht|ein|eine|für|auf|mit|von)(?:\s|$)'
english_pattern = r'(?:^|\s)(the|is|are|this|that|you|we|have|has|will|would|can|could)(?:\s|$)'
import re
if re.search(german_pattern, text, re.IGNORECASE):
patterns['languages']['de'] += 1
elif re.search(english_pattern, text, re.IGNORECASE):
patterns['languages']['en'] += 1
# Topic detection
for topic, keywords in TOPIC_KEYWORDS.items():
for kw in keywords:
if kw in text_lower:
patterns['topics'][topic] = patterns['topics'].get(topic, 0) + 1
break
# Time of day (Europe/Berlin)
ts = event.get('timestamp') or event.get('timestampMs')
if ts:
try:
dt = datetime.fromtimestamp(ts / 1000 if ts > 1e12 else ts)
hour = dt.hour
if 6 <= hour < 12:
patterns['time_of_day']['morning'] += 1
elif 12 <= hour < 18:
patterns['time_of_day']['afternoon'] += 1
elif 18 <= hour < 23:
patterns['time_of_day']['evening'] += 1
else:
patterns['time_of_day']['night'] += 1
except Exception:
pass
return patterns
def generate_context_md(patterns: dict, event_count: int, stream_info: Optional[dict]) -> str:
"""Generate learning context markdown."""
total_lang = patterns['languages']['de'] + patterns['languages']['en']
primary_lang = 'Deutsch' if patterns['languages']['de'] > patterns['languages']['en'] else 'English'
if total_lang > 0:
de_pct = round(patterns['languages']['de'] / total_lang * 100)
en_pct = round(patterns['languages']['en'] / total_lang * 100)
lang_ratio = f"{de_pct}% DE / {en_pct}% EN"
else:
lang_ratio = 'mixed'
# Sort topics by frequency
top_topics = sorted(patterns['topics'].items(), key=lambda x: -x[1])[:8]
# Sort tools by usage
top_tools = sorted(patterns['tools'].items(), key=lambda x: -x[1])[:8]
# Determine activity pattern
max_time = max(patterns['time_of_day'].items(), key=lambda x: x[1]) if patterns['time_of_day'] else ('varied', 0)
# Recent user messages (last 5 unique)
recent_user = list(set(patterns['user_messages'][-20:]))[-5:]
recent_user = [m.replace('\n', ' ')[:80] for m in recent_user]
stream_total = stream_info['messages'] if stream_info else None
stream_str = f"{stream_total:,}" if isinstance(stream_total, int) else '?'
md = f"""# Learning Context for Claudia (main agent)
*Auto-generated from {event_count} events*
*Stream total: {stream_str} events*
*Updated: {datetime.now().isoformat()}*
## User Preferences
| Preference | Value |
|------------|-------|
| **Primary Language** | {primary_lang} ({lang_ratio}) |
| **Peak Activity** | {max_time[0].capitalize()} |
| **Active Sessions** | {len(patterns['sessions'])} unique |
## Current Focus Areas (by frequency)
{chr(10).join(f"- **{t[0]}** — {t[1]} mentions" for t in top_topics) if top_topics else '- (analyzing patterns...)'}
## Most Used Tools
{chr(10).join(f"- `{t[0]}` — {t[1]}x" for t in top_tools) if top_tools else '- (no tool usage tracked)'}
## Activity Distribution
| Time | Events |
|------|--------|
| Morning (6-12) | {patterns['time_of_day']['morning']} |
| Afternoon (12-18) | {patterns['time_of_day']['afternoon']} |
| Evening (18-23) | {patterns['time_of_day']['evening']} |
| Night (23-6) | {patterns['time_of_day']['night']} |
## Agent Distribution
{chr(10).join(f"- **{a}**: {c} events" for a, c in sorted(patterns['agents'].items(), key=lambda x: -x[1])) if patterns['agents'] else '- main only'}
## Recent User Requests
{chr(10).join(f'> "{m}..."' for m in recent_user) if recent_user else '> (no recent messages captured)'}
---
## Insights for Response Generation
Based on the patterns above:
- **Language**: Respond primarily in {primary_lang}, mixing when appropriate
- **Focus**: Current work centers on {', '.join(t[0] for t in top_topics[:3]) or 'general tasks'}
- **Tools**: Frequently use {', '.join(t[0] for t in top_tools[:3]) or 'various tools'}
- **Timing**: Most active during {max_time[0]}
*This context helps maintain continuity and personalization across sessions.*
"""
return md
# --- Main ---
def run_context_generation(events: list[dict], output_path: Optional[Path] = None,
agent: str = 'main', stream_info: Optional[dict] = None) -> dict:
"""Run context generation on events."""
print(f"📊 Context Generator", file=sys.stderr)
print(f" Analyzing {len(events)} events...", file=sys.stderr)
patterns = analyze_events(events)
print(f" Topics: {len(patterns['topics'])}", file=sys.stderr)
print(f" Tools: {len(patterns['tools'])}", file=sys.stderr)
print(f" User messages: {len(patterns['user_messages'])}", file=sys.stderr)
context_md = generate_context_md(patterns, len(events), stream_info)
# Determine output path
if output_path:
out = output_path
else:
ldir = learning_dir() / agent
ldir.mkdir(parents=True, exist_ok=True)
out = ldir / 'learning-context.md'
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(context_md)
print(f"✅ Written to {out}", file=sys.stderr)
# Save patterns for debugging
patterns_out = out.parent / 'patterns.json'
patterns_serializable = {
**patterns,
'sessions': list(patterns['sessions']),
'user_messages': patterns['user_messages'][-20:],
'assistant_messages': patterns['assistant_messages'][-10:],
}
patterns_out.write_text(json.dumps(patterns_serializable, indent=2, ensure_ascii=False))
print(f" Patterns saved to {patterns_out}", file=sys.stderr)
return patterns_serializable
def main():
parser = argparse.ArgumentParser(
description='Generate LEARNING_CONTEXT.md from NATS events',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
cortex context --events 2000
cortex context --output ~/custom-context.md
cortex context --jsonl events.jsonl
'''
)
parser.add_argument('--events', type=int, default=1000,
help='Number of recent events to analyze (default: 1000)')
parser.add_argument('--output', '-o', type=Path,
help='Output file path (default: CORTEX_HOME/learning/<agent>/learning-context.md)')
parser.add_argument('--agent', default='main',
help='Agent name for output directory')
parser.add_argument('--jsonl', metavar='FILE',
help='Load events from JSONL file (use - for stdin)')
parser.add_argument('--json', action='store_true',
help='Output patterns as JSON')
args = parser.parse_args()
stream_info = None
if args.jsonl:
events = load_jsonl_events(args.jsonl)
else:
# Fetch from NATS
stream_info = get_stream_info()
if not stream_info:
print("❌ Could not get stream info", file=sys.stderr)
sys.exit(1)
print(f" Stream has {stream_info['messages']:,} total events", file=sys.stderr)
start_seq = max(stream_info['first_seq'], stream_info['last_seq'] - args.events + 1)
end_seq = stream_info['last_seq']
print(f" Fetching seq {start_seq} - {end_seq}...", file=sys.stderr)
events = fetch_nats_events(start_seq, end_seq)
if not events:
print("⚠️ No events found", file=sys.stderr)
sys.exit(0)
patterns = run_context_generation(events, args.output, args.agent, stream_info)
if args.json:
print(json.dumps(patterns, indent=2, ensure_ascii=False))
if __name__ == '__main__':
main()