From 0123ec70903125d85df3b46e318539dcb619d946 Mon Sep 17 00:00:00 2001 From: Claudia Date: Mon, 9 Feb 2026 12:51:56 +0100 Subject: [PATCH] fix: format specifier crash when stream_info is None --- cortex/cli.py | 20 + cortex/context.py | 433 ++++++++++++++++++++ cortex/learn.py | 528 +++++++++++++++++++++++++ cortex/scheduler.py | 24 ++ cortex/sentinel.py | 807 ++++++++++++++++++++++++++++++++++++++ cortex/tracker.py | 617 +++++++++++++++++++++++++++++ tests/test_new_modules.py | 436 ++++++++++++++++++++ 7 files changed, 2865 insertions(+) create mode 100644 cortex/context.py create mode 100644 cortex/learn.py create mode 100644 cortex/sentinel.py create mode 100644 cortex/tracker.py create mode 100644 tests/test_new_modules.py diff --git a/cortex/cli.py b/cortex/cli.py index 1cad4f5..575354d 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -12,6 +12,10 @@ Usage: cortex validate --transcript --task "description" cortex search "query" [--memory-dir ~/.cortex/memory] cortex handoff --from --to --task "description" + cortex learn --since 24h [--nats url] [--jsonl file] + cortex context [--events 2000] [--output file] + cortex track scan|list|done|check + cortex sentinel scan|matches|report|stats cortex version """ @@ -71,6 +75,22 @@ def main(): from cortex.auto_handoff import main as handoff_main handoff_main() + elif cmd == "learn": + from cortex.learn import main as learn_main + learn_main() + + elif cmd == "context": + from cortex.context import main as context_main + context_main() + + elif cmd == "track": + from cortex.tracker import main as tracker_main + tracker_main() + + elif cmd == "sentinel": + from cortex.sentinel import main as sentinel_main + sentinel_main() + elif cmd in ("-h", "--help", "help"): print(__doc__.strip()) diff --git a/cortex/context.py b/cortex/context.py new file mode 100644 index 0000000..52046a1 --- /dev/null +++ b/cortex/context.py @@ -0,0 +1,433 @@ +#!/usr/bin/env python3 +"""Cortex Context — Generate LEARNING_CONTEXT.md from NATS events. + +Ported from ~/clawd/scripts/context-generator.mjs + +Usage: + cortex context [--events 2000] [--output context.md] + cortex context --jsonl events.jsonl +""" + +import argparse +import json +import os +import subprocess +import sys +from datetime import datetime +from pathlib import Path +from typing import Optional + +from cortex.config import cortex_home + + +# --- Configuration --- + +def learning_dir() -> Path: + """Directory for learning output files.""" + return cortex_home() / "learning" + + +# Topic keywords for classification +TOPIC_KEYWORDS = { + 'coding': ['code', 'script', 'function', 'bug', 'error', 'python', 'javascript', 'node', 'npm'], + 'mondo-gate': ['mondo', 'mygate', 'fintech', 'payment', 'crypto', 'bitcoin', 'token', 'wallet'], + 'infrastructure': ['server', 'docker', 'systemd', 'service', 'nats', 'api', 'deploy', 'port'], + 'agents': ['agent', 'mona', 'vera', 'stella', 'viola', 'sub-agent', 'spawn', 'session'], + 'security': ['security', 'audit', 'vulnerability', 'password', 'credential', 'auth', 'vault'], + 'memory': ['memory', 'context', 'event', 'learning', 'brain', 'recall', 'remember'], + 'communication': ['matrix', 'telegram', 'email', 'message', 'chat', 'voice'], + 'business': ['meeting', 'pitch', 'investor', 'series', 'valuation', 'partner', 'strategy'], + 'files': ['file', 'read', 'write', 'edit', 'create', 'delete', 'directory'], +} + + +# --- NATS CLI Helpers --- + +def get_nats_bin() -> str: + """Find nats binary.""" + # Try common locations + for path in [ + os.path.expanduser('~/bin/nats'), + '/usr/local/bin/nats', + '/usr/bin/nats', + ]: + if os.path.exists(path): + return path + # Fall back to PATH + return 'nats' + + +def get_stream_info() -> Optional[dict]: + """Get NATS stream info.""" + nats_bin = get_nats_bin() + try: + result = subprocess.run( + [nats_bin, 'stream', 'info', 'openclaw-events', '--json'], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + info = json.loads(result.stdout) + return { + 'messages': info.get('state', {}).get('messages', 0), + 'first_seq': info.get('state', {}).get('first_seq', 1), + 'last_seq': info.get('state', {}).get('last_seq', 0), + } + except Exception as e: + print(f"❌ Error getting stream info: {e}", file=sys.stderr) + return None + + +def fetch_nats_events(start_seq: int, end_seq: int) -> list[dict]: + """Fetch events from NATS by sequence range.""" + nats_bin = get_nats_bin() + events = [] + + for seq in range(start_seq, end_seq + 1): + try: + result = subprocess.run( + [nats_bin, 'stream', 'get', 'openclaw-events', str(seq), '--json'], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + msg = json.loads(result.stdout) + # Decode base64 data + import base64 + data_b64 = msg.get('data', '') + data = json.loads(base64.b64decode(data_b64).decode('utf-8')) + events.append({ + 'seq': msg.get('seq'), + 'subject': msg.get('subject'), + 'time': msg.get('time'), + **data + }) + except Exception: + continue + + # Progress indicator + if len(events) % 200 == 0 and len(events) > 0: + print(f"\r Fetched {len(events)} events...", end='', file=sys.stderr) + + print(file=sys.stderr) + return events + + +def load_jsonl_events(source: str) -> list[dict]: + """Load events from JSONL file or stdin.""" + events = [] + + if source == '-': + lines = sys.stdin + else: + path = Path(source) + if not path.exists(): + print(f"❌ File not found: {source}", file=sys.stderr) + return [] + lines = path.open() + + for line in lines: + line = line.strip() + if not line: + continue + try: + events.append(json.loads(line)) + except json.JSONDecodeError: + continue + + if source != '-': + lines.close() + + return events + + +# --- Analysis --- + +def extract_text(event: dict) -> str: + """Extract text content from various event formats.""" + # Try text_preview + if event.get('payload', {}).get('text_preview'): + previews = event['payload']['text_preview'] + if isinstance(previews, list): + return ' '.join(p.get('text', '') for p in previews if isinstance(p, dict) and p.get('type') == 'text') + + # Try payload.data.text + if event.get('payload', {}).get('data', {}).get('text'): + return event['payload']['data']['text'] + + # Try payload.message + if event.get('payload', {}).get('message'): + return event['payload']['message'] + + # Try data.text + if event.get('data', {}).get('text'): + return event['data']['text'] + + return '' + + +def analyze_events(events: list[dict]) -> dict: + """Extract patterns from events.""" + patterns = { + 'languages': {'de': 0, 'en': 0}, + 'topics': {}, + 'tools': {}, + 'time_of_day': {'morning': 0, 'afternoon': 0, 'evening': 0, 'night': 0}, + 'message_types': {}, + 'agents': {}, + 'sessions': set(), + 'user_messages': [], + 'assistant_messages': [], + } + + for event in events: + text = extract_text(event) + text_lower = text.lower() + + # Message type tracking + msg_type = event.get('type', 'unknown') + patterns['message_types'][msg_type] = patterns['message_types'].get(msg_type, 0) + 1 + + # Agent tracking + if event.get('agent'): + agent = event['agent'] + patterns['agents'][agent] = patterns['agents'].get(agent, 0) + 1 + + # Session tracking + if event.get('session'): + patterns['sessions'].add(event['session']) + + # Track user vs assistant messages + if 'message_in' in msg_type or 'message.in' in msg_type: + if len(text) > 5: + patterns['user_messages'].append(text[:200]) + if 'message_out' in msg_type or 'message.out' in msg_type: + if len(text) > 20: + snippet = text[:200] + if snippet not in patterns['assistant_messages']: + patterns['assistant_messages'].append(snippet) + + # Tool usage from tool events + if 'tool' in msg_type and event.get('payload', {}).get('data', {}).get('name'): + tool_name = event['payload']['data']['name'] + patterns['tools'][tool_name] = patterns['tools'].get(tool_name, 0) + 1 + + # Skip if no text for language/topic analysis + if len(text) < 5: + continue + + # Language detection (simple heuristic) + german_pattern = r'[äöüß]|(?:^|\s)(ich|du|wir|das|ist|und|oder|nicht|ein|eine|für|auf|mit|von)(?:\s|$)' + english_pattern = r'(?:^|\s)(the|is|are|this|that|you|we|have|has|will|would|can|could)(?:\s|$)' + + import re + if re.search(german_pattern, text, re.IGNORECASE): + patterns['languages']['de'] += 1 + elif re.search(english_pattern, text, re.IGNORECASE): + patterns['languages']['en'] += 1 + + # Topic detection + for topic, keywords in TOPIC_KEYWORDS.items(): + for kw in keywords: + if kw in text_lower: + patterns['topics'][topic] = patterns['topics'].get(topic, 0) + 1 + break + + # Time of day (Europe/Berlin) + ts = event.get('timestamp') or event.get('timestampMs') + if ts: + try: + dt = datetime.fromtimestamp(ts / 1000 if ts > 1e12 else ts) + hour = dt.hour + if 6 <= hour < 12: + patterns['time_of_day']['morning'] += 1 + elif 12 <= hour < 18: + patterns['time_of_day']['afternoon'] += 1 + elif 18 <= hour < 23: + patterns['time_of_day']['evening'] += 1 + else: + patterns['time_of_day']['night'] += 1 + except Exception: + pass + + return patterns + + +def generate_context_md(patterns: dict, event_count: int, stream_info: Optional[dict]) -> str: + """Generate learning context markdown.""" + total_lang = patterns['languages']['de'] + patterns['languages']['en'] + primary_lang = 'Deutsch' if patterns['languages']['de'] > patterns['languages']['en'] else 'English' + + if total_lang > 0: + de_pct = round(patterns['languages']['de'] / total_lang * 100) + en_pct = round(patterns['languages']['en'] / total_lang * 100) + lang_ratio = f"{de_pct}% DE / {en_pct}% EN" + else: + lang_ratio = 'mixed' + + # Sort topics by frequency + top_topics = sorted(patterns['topics'].items(), key=lambda x: -x[1])[:8] + + # Sort tools by usage + top_tools = sorted(patterns['tools'].items(), key=lambda x: -x[1])[:8] + + # Determine activity pattern + max_time = max(patterns['time_of_day'].items(), key=lambda x: x[1]) if patterns['time_of_day'] else ('varied', 0) + + # Recent user messages (last 5 unique) + recent_user = list(set(patterns['user_messages'][-20:]))[-5:] + recent_user = [m.replace('\n', ' ')[:80] for m in recent_user] + + stream_total = stream_info['messages'] if stream_info else None + stream_str = f"{stream_total:,}" if isinstance(stream_total, int) else '?' + + md = f"""# Learning Context for Claudia (main agent) + +*Auto-generated from {event_count} events* +*Stream total: {stream_str} events* +*Updated: {datetime.now().isoformat()}* + +## User Preferences + +| Preference | Value | +|------------|-------| +| **Primary Language** | {primary_lang} ({lang_ratio}) | +| **Peak Activity** | {max_time[0].capitalize()} | +| **Active Sessions** | {len(patterns['sessions'])} unique | + +## Current Focus Areas (by frequency) + +{chr(10).join(f"- **{t[0]}** — {t[1]} mentions" for t in top_topics) if top_topics else '- (analyzing patterns...)'} + +## Most Used Tools + +{chr(10).join(f"- `{t[0]}` — {t[1]}x" for t in top_tools) if top_tools else '- (no tool usage tracked)'} + +## Activity Distribution + +| Time | Events | +|------|--------| +| Morning (6-12) | {patterns['time_of_day']['morning']} | +| Afternoon (12-18) | {patterns['time_of_day']['afternoon']} | +| Evening (18-23) | {patterns['time_of_day']['evening']} | +| Night (23-6) | {patterns['time_of_day']['night']} | + +## Agent Distribution + +{chr(10).join(f"- **{a}**: {c} events" for a, c in sorted(patterns['agents'].items(), key=lambda x: -x[1])) if patterns['agents'] else '- main only'} + +## Recent User Requests + +{chr(10).join(f'> "{m}..."' for m in recent_user) if recent_user else '> (no recent messages captured)'} + +--- + +## Insights for Response Generation + +Based on the patterns above: +- **Language**: Respond primarily in {primary_lang}, mixing when appropriate +- **Focus**: Current work centers on {', '.join(t[0] for t in top_topics[:3]) or 'general tasks'} +- **Tools**: Frequently use {', '.join(t[0] for t in top_tools[:3]) or 'various tools'} +- **Timing**: Most active during {max_time[0]} + +*This context helps maintain continuity and personalization across sessions.* +""" + return md + + +# --- Main --- + +def run_context_generation(events: list[dict], output_path: Optional[Path] = None, + agent: str = 'main', stream_info: Optional[dict] = None) -> dict: + """Run context generation on events.""" + print(f"📊 Context Generator", file=sys.stderr) + print(f" Analyzing {len(events)} events...", file=sys.stderr) + + patterns = analyze_events(events) + + print(f" Topics: {len(patterns['topics'])}", file=sys.stderr) + print(f" Tools: {len(patterns['tools'])}", file=sys.stderr) + print(f" User messages: {len(patterns['user_messages'])}", file=sys.stderr) + + context_md = generate_context_md(patterns, len(events), stream_info) + + # Determine output path + if output_path: + out = output_path + else: + ldir = learning_dir() / agent + ldir.mkdir(parents=True, exist_ok=True) + out = ldir / 'learning-context.md' + + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(context_md) + + print(f"✅ Written to {out}", file=sys.stderr) + + # Save patterns for debugging + patterns_out = out.parent / 'patterns.json' + patterns_serializable = { + **patterns, + 'sessions': list(patterns['sessions']), + 'user_messages': patterns['user_messages'][-20:], + 'assistant_messages': patterns['assistant_messages'][-10:], + } + patterns_out.write_text(json.dumps(patterns_serializable, indent=2, ensure_ascii=False)) + print(f" Patterns saved to {patterns_out}", file=sys.stderr) + + return patterns_serializable + + +def main(): + parser = argparse.ArgumentParser( + description='Generate LEARNING_CONTEXT.md from NATS events', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Examples: + cortex context --events 2000 + cortex context --output ~/custom-context.md + cortex context --jsonl events.jsonl + ''' + ) + parser.add_argument('--events', type=int, default=1000, + help='Number of recent events to analyze (default: 1000)') + parser.add_argument('--output', '-o', type=Path, + help='Output file path (default: CORTEX_HOME/learning//learning-context.md)') + parser.add_argument('--agent', default='main', + help='Agent name for output directory') + parser.add_argument('--jsonl', metavar='FILE', + help='Load events from JSONL file (use - for stdin)') + parser.add_argument('--json', action='store_true', + help='Output patterns as JSON') + + args = parser.parse_args() + + stream_info = None + + if args.jsonl: + events = load_jsonl_events(args.jsonl) + else: + # Fetch from NATS + stream_info = get_stream_info() + if not stream_info: + print("❌ Could not get stream info", file=sys.stderr) + sys.exit(1) + + print(f" Stream has {stream_info['messages']:,} total events", file=sys.stderr) + + start_seq = max(stream_info['first_seq'], stream_info['last_seq'] - args.events + 1) + end_seq = stream_info['last_seq'] + + print(f" Fetching seq {start_seq} - {end_seq}...", file=sys.stderr) + events = fetch_nats_events(start_seq, end_seq) + + if not events: + print("⚠️ No events found", file=sys.stderr) + sys.exit(0) + + patterns = run_context_generation(events, args.output, args.agent, stream_info) + + if args.json: + print(json.dumps(patterns, indent=2, ensure_ascii=False)) + + +if __name__ == '__main__': + main() diff --git a/cortex/learn.py b/cortex/learn.py new file mode 100644 index 0000000..c481c4e --- /dev/null +++ b/cortex/learn.py @@ -0,0 +1,528 @@ +#!/usr/bin/env python3 +"""Cortex Learn — Extract preferences and behavior patterns from NATS events. + +Ported from ~/clawd/scripts/learning-processor.mjs + +Usage: + cortex learn --since 24h [--nats nats://localhost:4222] + cortex learn --jsonl input.jsonl + cat events.jsonl | cortex learn --jsonl - +""" + +import argparse +import hashlib +import json +import os +import re +import sys +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Optional + +from cortex.config import cortex_home + +# Try to import nats (optional dependency) +try: + import nats + from nats.js.api import StreamSource + HAS_NATS = True +except ImportError: + HAS_NATS = False + + +# --- Directories --- + +def learning_dir() -> Path: + """Directory for learning output files.""" + return cortex_home() / "learning" + + +# --- Constants --- + +# German + English stopwords to filter from topics +STOPWORDS = { + # German + 'nicht', 'dass', 'wenn', 'dann', 'aber', 'oder', 'und', 'der', 'die', 'das', + 'ein', 'eine', 'einer', 'ist', 'sind', 'war', 'waren', 'wird', 'werden', + 'hat', 'haben', 'kann', 'können', 'muss', 'müssen', 'soll', 'sollte', + 'wird', 'würde', 'hier', 'dort', 'jetzt', 'noch', 'schon', 'auch', 'nur', + 'sehr', 'mehr', 'viel', 'ganz', 'alle', 'allem', 'allen', 'aller', 'alles', + 'heute', 'morgen', 'gestern', 'immer', 'wieder', 'also', 'dabei', 'damit', + 'danach', 'darin', 'darum', 'davon', 'dazu', 'denn', 'doch', 'durch', + # English + 'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'had', + 'her', 'was', 'one', 'our', 'out', 'has', 'have', 'been', 'would', 'could', + 'this', 'that', 'with', 'from', 'what', 'which', 'their', 'will', 'there', +} + +# Signal patterns for preference extraction +POSITIVE_PATTERNS = [ + r'\bmega\b', r'\bsuper\b', r'\bperfekt\b', r'\bgenau\s*so\b', r'\bja\s*!', + r'\bcool\b', r'gut\s*gemacht', r'das\s*war\s*gut', r'gefällt\s*mir', + r'\bklasse\b', r'\btoll\b', r'👍', r'🎉', r'💪', r'❤️', r'😘', r'🚀', r'✅', r'💯', +] + +NEGATIVE_PATTERNS = [ + r'\bnein,\s*ich', r'nicht\s*so,', r'\bfalsch\b', r'\bstopp\b', + r'das\s*stimmt\s*nicht', r'ich\s*meinte\s*eigentlich', r'❌', r'👎', +] + +CORRECTION_PATTERNS = [ + r'nein,?\s*ich\s*meinte', + r'nicht\s*so,?\s*sondern', + r'das\s*ist\s*falsch', + r'eigentlich\s*wollte\s*ich', +] + +# Compile patterns for efficiency +POSITIVE_RE = [re.compile(p, re.IGNORECASE) for p in POSITIVE_PATTERNS] +NEGATIVE_RE = [re.compile(p, re.IGNORECASE) for p in NEGATIVE_PATTERNS] +CORRECTION_RE = [re.compile(p, re.IGNORECASE) for p in CORRECTION_PATTERNS] + + +# --- Helper Functions --- + +def parse_duration(duration_str: str) -> timedelta: + """Parse duration string like '24h', '7d', '30m' to timedelta.""" + match = re.match(r'^(\d+)([hdm])$', duration_str.lower()) + if not match: + raise ValueError(f"Invalid duration format: {duration_str}. Use like '24h', '7d', '30m'") + + value = int(match.group(1)) + unit = match.group(2) + + if unit == 'h': + return timedelta(hours=value) + elif unit == 'd': + return timedelta(days=value) + elif unit == 'm': + return timedelta(minutes=value) + else: + raise ValueError(f"Unknown unit: {unit}") + + +def load_json(path: Path, default: Any = None) -> Any: + """Load JSON file, returning default if not found.""" + if path.exists(): + return json.loads(path.read_text()) + return default if default is not None else {} + + +def save_json(path: Path, data: Any) -> None: + """Save data as JSON file.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2, ensure_ascii=False)) + + +def extract_text(event: dict) -> str: + """Extract text content from various event formats.""" + # Try different paths where text might be + if isinstance(event.get('payload'), dict): + payload = event['payload'] + if 'data' in payload and isinstance(payload['data'], dict): + if 'text' in payload['data']: + return payload['data']['text'] + if 'content' in payload: + return payload['content'] + if 'text_preview' in payload: + previews = payload['text_preview'] + if isinstance(previews, list): + return ' '.join(p.get('text', '') for p in previews if isinstance(p, dict)) + + if 'data' in event and isinstance(event['data'], dict): + if 'text' in event['data']: + return event['data']['text'] + + return '' + + +# --- Signal Extraction --- + +def extract_signals(events: list[dict]) -> dict: + """Extract preference signals from events.""" + signals = { + 'positive': [], + 'negative': [], + 'corrections': [], + 'topics': {}, # topic -> count + 'skill_gaps': {}, # domain -> count + 'task_outcomes': [], + } + + for event in events: + text = extract_text(event) + if not text: + continue + + timestamp = event.get('timestamp', event.get('timestampMs', 0)) + + # Check for positive signals + for pattern in POSITIVE_RE: + if pattern.search(text): + signals['positive'].append({ + 'timestamp': timestamp, + 'text': text[:200], + 'pattern': pattern.pattern, + }) + break + + # Check for negative signals + for pattern in NEGATIVE_RE: + if pattern.search(text): + signals['negative'].append({ + 'timestamp': timestamp, + 'text': text[:200], + 'pattern': pattern.pattern, + }) + break + + # Check for corrections + for pattern in CORRECTION_RE: + if pattern.search(text): + signals['corrections'].append({ + 'timestamp': timestamp, + 'text': text[:200], + }) + break + + # Extract topics (capitalized words/phrases) + topic_matches = re.findall(r'[A-Z][a-zäöü]+(?:\s+[A-Z][a-zäöü]+)*', text) + for topic in topic_matches: + if len(topic) > 3 and topic.lower() not in STOPWORDS: + signals['topics'][topic] = signals['topics'].get(topic, 0) + 1 + + # Detect skill gaps (errors, retries) + if re.search(r'error|fehler|failed|nicht gefunden|module not found', text, re.IGNORECASE): + domains = re.findall(r'TypeScript|NATS|Matrix|Python|Node|npm|git', text, re.IGNORECASE) + for domain in (domains or ['unknown']): + d = domain.lower() + signals['skill_gaps'][d] = signals['skill_gaps'].get(d, 0) + 1 + + # Track task outcomes (lifecycle events) + event_type = event.get('type', '') + if 'lifecycle' in event_type: + phase = event.get('payload', {}).get('data', {}).get('phase', '') + if phase in ('end', 'error'): + signals['task_outcomes'].append({ + 'timestamp': timestamp, + 'success': phase == 'end', + 'session': event.get('session', ''), + }) + + return signals + + +def derive_preferences(signals: dict, existing: dict) -> dict: + """Derive preferences from extracted signals.""" + prefs = {**existing} + + positive_count = len(signals['positive']) + negative_count = len(signals['negative']) + correction_count = len(signals['corrections']) + + prefs['lastAnalysis'] = datetime.now().isoformat() + prefs['signalCounts'] = { + 'positive': positive_count, + 'negative': negative_count, + 'corrections': correction_count, + } + + # Derive insights + prefs['derived'] = prefs.get('derived', {}) + + if correction_count > 5: + prefs['derived']['needsMoreClarification'] = True + + if positive_count > negative_count * 2: + prefs['derived']['approachWorking'] = True + + # Top topics + top_topics = sorted(signals['topics'].items(), key=lambda x: -x[1])[:10] + prefs['topTopics'] = [{'topic': t, 'count': c} for t, c in top_topics] + + # Explicit preferences (defaults) + prefs['explicit'] = { + 'responseLength': 'concise', + 'technicalDepth': 'high', + 'humor': 'appreciated', + 'tablesInTelegram': 'never', + 'proactiveSuggestions': 'always', + 'fleiß': 'mandatory', + 'ehrgeiz': 'mandatory', + 'biss': 'mandatory', + } + + return prefs + + +def derive_behaviors(signals: dict, existing: dict) -> dict: + """Derive behavior patterns from signals.""" + behaviors = {**existing} + behaviors['lastAnalysis'] = datetime.now().isoformat() + + behaviors['successPatterns'] = behaviors.get('successPatterns', []) + behaviors['failurePatterns'] = behaviors.get('failurePatterns', []) + + # Recent positive signals indicate successful patterns + for pos in signals['positive'][-10:]: + behaviors['successPatterns'].append({ + 'timestamp': pos['timestamp'], + 'context': pos['text'][:100], + }) + + # Keep only last 50 patterns + behaviors['successPatterns'] = behaviors['successPatterns'][-50:] + behaviors['failurePatterns'] = behaviors['failurePatterns'][-50:] + + return behaviors + + +def generate_learning_context(prefs: dict, behaviors: dict, topics: dict, skill_gaps: dict) -> str: + """Generate LEARNING_CONTEXT.md content.""" + lines = [ + '# Learned Context', + '', + f'*Auto-generated by Learning Processor at {datetime.now().isoformat()}*', + '', + '## Core Values (Fleiß, Ehrgeiz, Biss)', + '', + '- **Proaktiv handeln** — Nicht warten, sondern vorangehen', + '- **Bei Fehlern:** 3 Alternativen probieren, dann nochmal 3, dann erst fragen', + '- **Enttäuschung als Treibstoff** — Doppelter Fleiß bis es funktioniert', + '', + '## Learned Preferences', + '', + ] + + if prefs.get('explicit'): + for key, value in prefs['explicit'].items(): + lines.append(f'- **{key}:** {value}') + + lines.extend(['', '## Active Topics', '']) + + real_topics = [t for t in prefs.get('topTopics', []) + if t['topic'].lower() not in STOPWORDS][:8] + + for t in real_topics: + lines.append(f"- {t['topic']} ({t['count']} mentions)") + + if skill_gaps.get('priority'): + lines.extend(['', '## Areas to Improve', '']) + for gap in skill_gaps['priority']: + lines.append(f"- **{gap['area']}:** {gap['frequency']} recent errors — study this!") + + lines.extend(['', '## Signal Summary', '']) + lines.append(f"- Positive signals: {prefs.get('signalCounts', {}).get('positive', 0)}") + lines.append(f"- Corrections needed: {prefs.get('signalCounts', {}).get('corrections', 0)}") + + return '\n'.join(lines) + + +# --- NATS Event Fetching --- + +async def fetch_nats_events(nats_url: str, hours: int = 24) -> list[dict]: + """Fetch recent events from NATS JetStream.""" + if not HAS_NATS: + print("❌ nats-py not installed. Use: pip install nats-py", file=sys.stderr) + return [] + + events = [] + cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000) + + # Parse credentials from URL or environment + user = os.environ.get('NATS_USER', 'claudia') + password = os.environ.get('NATS_PASSWORD', '') + + try: + nc = await nats.connect(nats_url, user=user, password=password) + js = nc.jetstream() + + # Get stream info + try: + stream = await js.stream_info('openclaw-events') + last_seq = stream.state.last_seq + start_seq = max(1, last_seq - 500) # Last 500 events + + # Fetch messages by sequence + for seq in range(start_seq, last_seq + 1): + try: + msg = await js.get_msg('openclaw-events', seq) + event = json.loads(msg.data.decode()) + if event.get('timestamp', 0) >= cutoff: + events.append(event) + except Exception: + continue + except Exception as e: + print(f"❌ Error reading stream: {e}", file=sys.stderr) + + await nc.drain() + except Exception as e: + print(f"❌ NATS connection failed: {e}", file=sys.stderr) + + return events + + +def load_jsonl_events(source: str, hours: int = 24) -> list[dict]: + """Load events from JSONL file or stdin.""" + events = [] + cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000) + + if source == '-': + lines = sys.stdin + else: + path = Path(source) + if not path.exists(): + print(f"❌ File not found: {source}", file=sys.stderr) + return [] + lines = path.open() + + for line in lines: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + # Filter by time if timestamp present + ts = event.get('timestamp', event.get('timestampMs', 0)) + if ts == 0 or ts >= cutoff: + events.append(event) + except json.JSONDecodeError: + continue + + if source != '-': + lines.close() + + return events + + +# --- Main --- + +def run_learning_cycle(events: list[dict], agent: str = 'main') -> dict: + """Run the learning cycle on provided events.""" + ldir = learning_dir() / agent + ldir.mkdir(parents=True, exist_ok=True) + + # File paths + prefs_file = ldir / 'preferences.json' + behaviors_file = ldir / 'behaviors.json' + topics_file = ldir / 'topics.json' + skill_gaps_file = ldir / 'skill-gaps.json' + context_file = ldir / 'learning-context.md' + + print(f'🧠 Learning Processor', file=sys.stderr) + print(f' Events: {len(events)}', file=sys.stderr) + + # Extract signals + signals = extract_signals(events) + print(f' Positive: {len(signals["positive"])}', file=sys.stderr) + print(f' Negative: {len(signals["negative"])}', file=sys.stderr) + print(f' Corrections: {len(signals["corrections"])}', file=sys.stderr) + print(f' Topics: {len(signals["topics"])}', file=sys.stderr) + + # Load existing data + existing_prefs = load_json(prefs_file, {}) + existing_behaviors = load_json(behaviors_file, {}) + + # Derive new preferences and behaviors + new_prefs = derive_preferences(signals, existing_prefs) + new_behaviors = derive_behaviors(signals, existing_behaviors) + + # Save all files + save_json(prefs_file, new_prefs) + save_json(behaviors_file, new_behaviors) + + topics_data = { + 'lastAnalysis': datetime.now().isoformat(), + 'topics': signals['topics'], + } + save_json(topics_file, topics_data) + + skill_gaps_data = { + 'lastAnalysis': datetime.now().isoformat(), + 'gaps': signals['skill_gaps'], + 'priority': sorted( + [{'area': k, 'frequency': v} for k, v in signals['skill_gaps'].items()], + key=lambda x: -x['frequency'] + )[:5], + } + save_json(skill_gaps_file, skill_gaps_data) + + # Generate context markdown + context_md = generate_learning_context(new_prefs, new_behaviors, topics_data, skill_gaps_data) + context_file.write_text(context_md) + + print(f'\n✅ Learning cycle complete!', file=sys.stderr) + print(f' Output: {ldir}', file=sys.stderr) + + # Print summary + if new_prefs.get('topTopics'): + print('\n📈 Top Topics:', file=sys.stderr) + for t in new_prefs['topTopics'][:5]: + print(f" - {t['topic']}: {t['count']}", file=sys.stderr) + + if skill_gaps_data['priority']: + print('\n⚠️ Skill Gaps:', file=sys.stderr) + for gap in skill_gaps_data['priority']: + print(f" - {gap['area']}: {gap['frequency']} errors", file=sys.stderr) + + return { + 'preferences': new_prefs, + 'behaviors': new_behaviors, + 'topics': topics_data, + 'skill_gaps': skill_gaps_data, + } + + +def main(): + parser = argparse.ArgumentParser( + description='Extract preferences and patterns from NATS events', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Examples: + cortex learn --since 24h + cortex learn --since 7d --nats nats://localhost:4222 + cortex learn --jsonl events.jsonl + cat events.jsonl | cortex learn --jsonl - + ''' + ) + parser.add_argument('--since', default='24h', + help='Time window (e.g. 24h, 7d, 30m)') + parser.add_argument('--nats', default='nats://localhost:4222', + help='NATS server URL') + parser.add_argument('--jsonl', metavar='FILE', + help='Load events from JSONL file (use - for stdin)') + parser.add_argument('--agent', default='main', + help='Agent name for output directory') + parser.add_argument('--json', action='store_true', + help='Output results as JSON') + + args = parser.parse_args() + + # Parse duration + try: + duration = parse_duration(args.since) + hours = duration.total_seconds() / 3600 + except ValueError as e: + print(f"❌ {e}", file=sys.stderr) + sys.exit(1) + + # Fetch events + if args.jsonl: + events = load_jsonl_events(args.jsonl, int(hours)) + else: + import asyncio + events = asyncio.run(fetch_nats_events(args.nats, int(hours))) + + if not events: + print("⚠️ No events found", file=sys.stderr) + sys.exit(0) + + # Run learning cycle + results = run_learning_cycle(events, args.agent) + + if args.json: + print(json.dumps(results, indent=2, ensure_ascii=False)) + + +if __name__ == '__main__': + main() diff --git a/cortex/scheduler.py b/cortex/scheduler.py index b678dea..205421f 100644 --- a/cortex/scheduler.py +++ b/cortex/scheduler.py @@ -44,6 +44,30 @@ JOBS = { "default_interval_min": 30, "calendar": "*-*-* *:0/30:00", }, + "learn": { + "description": "Extract preferences from NATS events", + "command": "cortex learn --since {interval}", + "default_interval_min": 360, # 6 hours + "calendar": "*-*-* 0/6:00:00", + }, + "context": { + "description": "Generate learning context from events", + "command": "cortex context --events 2000", + "default_interval_min": 360, # 6 hours + "calendar": "*-*-* 0/6:00:00", + }, + "tracker": { + "description": "Scan for commitments and contradictions", + "command": "cortex track scan --since {interval}", + "default_interval_min": 1440, # daily + "calendar": "*-*-* 06:00:00", + }, + "sentinel": { + "description": "Security feed aggregation and CVE matching", + "command": "cortex sentinel scan && cortex sentinel matches", + "default_interval_min": 360, # 6 hours + "calendar": "*-*-* 0/6:00:00", + }, } diff --git a/cortex/sentinel.py b/cortex/sentinel.py new file mode 100644 index 0000000..7f6f3ba --- /dev/null +++ b/cortex/sentinel.py @@ -0,0 +1,807 @@ +#!/usr/bin/env python3 +"""Cortex Sentinel — Security Feed Aggregation and CVE Matching. + +Consolidated from ~/clawd/scripts/sentinel/ (rss-fetch.py, db.py, cve-match.py, report-gen.py) + +Features: +- RSS security feed aggregation +- SQLite-based deduplication +- CVE matching against local inventory +- Report generation (markdown + AI summary) + +Usage: + cortex sentinel scan [--nmap] + cortex sentinel report [--llm] + cortex sentinel matches + cortex sentinel stats +""" + +import argparse +import hashlib +import json +import os +import re +import sqlite3 +import sys +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Optional + +import requests + +from cortex.config import cortex_home + +# Try to import feedparser (optional dependency) +try: + import feedparser + HAS_FEEDPARSER = True +except ImportError: + HAS_FEEDPARSER = False + +# Disable SSL warnings for problematic feeds +try: + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +except ImportError: + pass + + +# --- Configuration --- + +def _env(key: str, default: str = '') -> str: + return os.environ.get(key, default) + + +def sentinel_dir() -> Path: + """Base directory for sentinel data.""" + return cortex_home() / "sentinel" + + +def sentinel_db_path() -> Path: + """Path to sentinel SQLite database.""" + return sentinel_dir() / "sentinel.db" + + +def feeds_dir() -> Path: + """Directory for feed output files.""" + d = sentinel_dir() / "feeds" + d.mkdir(parents=True, exist_ok=True) + return d + + +def reports_dir() -> Path: + """Directory for report output files.""" + d = sentinel_dir() / "reports" + d.mkdir(parents=True, exist_ok=True) + return d + + +def llm_url() -> str: + """LLM API URL for AI summaries.""" + return _env('CORTEX_LLM_URL', 'http://localhost:11434/api/generate') + + +def llm_model() -> str: + """LLM model to use.""" + return _env('CORTEX_LLM_MODEL', 'mistral:7b') + + +# User agent for HTTP requests +USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0" + + +# --- Feed Configuration --- + +FEEDS = { + # Security News + "bleepingcomputer": { + "url": "https://www.bleepingcomputer.com/feed/", + "category": "security-news" + }, + "hackernews": { + "url": "https://feeds.feedburner.com/TheHackersNews", + "category": "security-news" + }, + "darkreading": { + "url": "https://www.darkreading.com/rss.xml", + "category": "security-news" + }, + "schneier": { + "url": "https://www.schneier.com/feed/atom/", + "category": "security-news" + }, + "securityweek": { + "url": "https://www.securityweek.com/feed/", + "category": "security-news" + }, + + # CVE/Vulnerability Feeds + "nvd-recent": { + "url": "https://nvd.nist.gov/feeds/xml/cve/misc/nvd-rss.xml", + "category": "cve", + "verify_ssl": False + }, + "cisa-alerts": { + "url": "https://www.cisa.gov/cybersecurity-advisories/all.xml", + "category": "cve", + "verify_ssl": False + }, + + # AI/ML Security + "huggingface-blog": { + "url": "https://huggingface.co/blog/feed.xml", + "category": "ai-security" + }, + "google-ai-blog": { + "url": "https://blog.google/technology/ai/rss/", + "category": "ai-security" + }, + + # Exploit Databases + "exploitdb": { + "url": "https://www.exploit-db.com/rss.xml", + "category": "exploits", + "verify_ssl": False + }, +} + + +# Keywords that indicate relevance to our infrastructure +RELEVANT_KEYWORDS = [ + # Tech stack + "linux", "debian", "nginx", "traefik", "docker", "postgresql", "redis", + "node.js", "nodejs", "python", "openssh", "git", "chromium", "openssl", + "ollama", "llm", "whisper", "matrix", "synapse", "element", + + # Hardware + "amd", "radeon", "rocm", "fritzbox", "avm", + + # Critical issues + "critical", "rce", "remote code execution", "zero-day", "0-day", + "ransomware", "supply chain", "authentication bypass", + + # AI-specific + "prompt injection", "jailbreak", "model extraction", "adversarial", + "llm vulnerability", "ai safety", "model poisoning" +] + + +# Software inventory for CVE matching +INVENTORY = { + "operating_systems": [ + {"name": "Debian", "version": "12", "aliases": ["debian", "bookworm"]}, + {"name": "Linux Kernel", "version": "6.1", "aliases": ["linux", "kernel"]}, + ], + "services": [ + {"name": "OpenSSH", "version": "9.2", "aliases": ["ssh", "openssh", "sshd"]}, + {"name": "Nginx", "version": "1.22", "aliases": ["nginx"]}, + {"name": "Traefik", "version": "2.10", "aliases": ["traefik"]}, + {"name": "Docker", "version": "24", "aliases": ["docker", "containerd"]}, + {"name": "Node.js", "version": "22", "aliases": ["node", "nodejs", "npm"]}, + {"name": "Python", "version": "3.11", "aliases": ["python", "python3"]}, + {"name": "PostgreSQL", "version": "15", "aliases": ["postgres", "postgresql"]}, + {"name": "Redis", "version": "7", "aliases": ["redis"]}, + {"name": "Ollama", "version": "0.1", "aliases": ["ollama", "llama"]}, + ], + "applications": [ + {"name": "Chromium", "version": "120", "aliases": ["chromium", "chrome"]}, + {"name": "Git", "version": "2.39", "aliases": ["git"]}, + {"name": "OpenSSL", "version": "3.0", "aliases": ["openssl", "ssl", "tls"]}, + ], + "hardware": [ + {"name": "AMD Radeon RX 5700 XT", "aliases": ["amd", "radeon", "rx5700", "navi", "gfx1010"]}, + {"name": "Fritz!Box", "aliases": ["fritzbox", "fritz", "avm"]}, + ] +} + + +# --- Database --- + +def get_db() -> sqlite3.Connection: + """Get database connection with row factory.""" + sentinel_dir().mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(sentinel_db_path()) + conn.row_factory = sqlite3.Row + return conn + + +def init_db() -> None: + """Initialize database schema.""" + conn = get_db() + conn.executescript(""" + CREATE TABLE IF NOT EXISTS alerts ( + id TEXT PRIMARY KEY, + source TEXT NOT NULL, + category TEXT, + title TEXT NOT NULL, + link TEXT, + summary TEXT, + severity TEXT DEFAULT 'info', + relevant INTEGER DEFAULT 0, + first_seen TEXT NOT NULL, + last_seen TEXT NOT NULL, + seen_count INTEGER DEFAULT 1, + notified INTEGER DEFAULT 0, + acknowledged INTEGER DEFAULT 0 + ); + + CREATE INDEX IF NOT EXISTS idx_alerts_source ON alerts(source); + CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity); + CREATE INDEX IF NOT EXISTS idx_alerts_first_seen ON alerts(first_seen); + CREATE INDEX IF NOT EXISTS idx_alerts_notified ON alerts(notified); + + CREATE TABLE IF NOT EXISTS runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + total_fetched INTEGER, + new_alerts INTEGER, + duplicates INTEGER, + notified INTEGER + ); + """) + conn.commit() + conn.close() + + +def add_alert(alert: dict) -> bool: + """Add alert if new, update if exists. Returns True if new.""" + conn = get_db() + now = datetime.now().isoformat() + + cur = conn.execute("SELECT id, seen_count FROM alerts WHERE id = ?", (alert["id"],)) + existing = cur.fetchone() + + if existing: + conn.execute(""" + UPDATE alerts SET last_seen = ?, seen_count = seen_count + 1 + WHERE id = ? + """, (now, alert["id"])) + conn.commit() + conn.close() + return False + else: + conn.execute(""" + INSERT INTO alerts (id, source, category, title, link, summary, + severity, relevant, first_seen, last_seen) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + alert["id"], + alert.get("source", "unknown"), + alert.get("category", ""), + alert.get("title", "")[:500], + alert.get("link", ""), + alert.get("summary", "")[:1000], + alert.get("severity", "info"), + 1 if alert.get("relevant") else 0, + now, + now + )) + conn.commit() + conn.close() + return True + + +def log_run(total: int, new: int, dupes: int, notified: int = 0) -> None: + """Log a sentinel run.""" + conn = get_db() + conn.execute(""" + INSERT INTO runs (timestamp, total_fetched, new_alerts, duplicates, notified) + VALUES (?, ?, ?, ?, ?) + """, (datetime.now().isoformat(), total, new, dupes, notified)) + conn.commit() + conn.close() + + +def get_stats() -> dict: + """Get database statistics.""" + conn = get_db() + stats = {} + + cur = conn.execute("SELECT COUNT(*) FROM alerts") + stats["total_alerts"] = cur.fetchone()[0] + + cur = conn.execute(""" + SELECT severity, COUNT(*) as count FROM alerts + GROUP BY severity ORDER BY count DESC + """) + stats["by_severity"] = {row["severity"]: row["count"] for row in cur.fetchall()} + + cur = conn.execute("SELECT COUNT(*) FROM alerts WHERE notified = 0 AND relevant = 1") + stats["unnotified"] = cur.fetchone()[0] + + yesterday = (datetime.now() - timedelta(days=1)).isoformat() + cur = conn.execute("SELECT COUNT(*) FROM alerts WHERE first_seen > ?", (yesterday,)) + stats["last_24h"] = cur.fetchone()[0] + + cur = conn.execute("SELECT * FROM runs ORDER BY timestamp DESC LIMIT 5") + stats["recent_runs"] = [dict(row) for row in cur.fetchall()] + + conn.close() + return stats + + +def get_unnotified_alerts(min_severity: str = "medium") -> list[dict]: + """Get alerts that haven't been notified yet.""" + severity_order = {"critical": 1, "high": 2, "medium": 3, "info": 4} + min_level = severity_order.get(min_severity, 3) + + conn = get_db() + cur = conn.execute(""" + SELECT * FROM alerts + WHERE notified = 0 AND relevant = 1 + ORDER BY + CASE severity + WHEN 'critical' THEN 1 + WHEN 'high' THEN 2 + WHEN 'medium' THEN 3 + ELSE 4 + END, + first_seen DESC + LIMIT 20 + """) + alerts = [dict(row) for row in cur.fetchall()] + conn.close() + + return [a for a in alerts if severity_order.get(a["severity"], 4) <= min_level] + + +def get_recent_alerts(limit: int = 50) -> list[dict]: + """Get recent alerts from database.""" + conn = get_db() + cur = conn.execute(""" + SELECT * FROM alerts + ORDER BY first_seen DESC + LIMIT ? + """, (limit,)) + alerts = [dict(row) for row in cur.fetchall()] + conn.close() + return alerts + + +# --- Feed Fetching --- + +def fetch_feed(name: str, config: dict) -> list[dict]: + """Fetch a single RSS feed.""" + if not HAS_FEEDPARSER: + print(f" ⚠️ feedparser not installed", file=sys.stderr) + return [] + + url = config["url"] + verify_ssl = config.get("verify_ssl", True) + + try: + headers = {"User-Agent": USER_AGENT} + response = requests.get(url, headers=headers, timeout=15, verify=verify_ssl) + response.raise_for_status() + + feed = feedparser.parse(response.content) + + if feed.bozo and not feed.entries: + print(f" ⚠️ {name}: Parse error", file=sys.stderr) + return [] + + entries = [] + for entry in feed.entries[:20]: # Max 20 per feed + title = entry.get("title", "No title") + link = entry.get("link", "") + summary = entry.get("summary", entry.get("description", ""))[:500] + published = entry.get("published", entry.get("updated", "")) + + # Check relevance + text_check = f"{title} {summary}".lower() + is_relevant = any(kw in text_check for kw in RELEVANT_KEYWORDS) + + # Determine severity + severity = "info" + if any(kw in text_check for kw in ["critical", "rce", "zero-day", "0-day", "ransomware"]): + severity = "critical" + elif any(kw in text_check for kw in ["high", "remote", "exploit", "vulnerability"]): + severity = "high" + elif any(kw in text_check for kw in ["medium", "moderate", "security"]): + severity = "medium" + + entries.append({ + "id": hashlib.md5(f"{name}:{link}".encode()).hexdigest()[:12], + "source": name, + "category": config["category"], + "title": title, + "link": link, + "summary": summary[:300], + "published": published, + "severity": severity, + "relevant": is_relevant, + "fetched_at": datetime.now().isoformat() + }) + + print(f" ✅ {name}: {len(entries)} entries", file=sys.stderr) + return entries + + except requests.exceptions.SSLError: + if verify_ssl: + config["verify_ssl"] = False + return fetch_feed(name, config) + return [] + except requests.exceptions.Timeout: + print(f" ❌ {name}: Timeout", file=sys.stderr) + return [] + except requests.exceptions.RequestException as e: + print(f" ❌ {name}: {type(e).__name__}", file=sys.stderr) + return [] + except Exception as e: + print(f" ❌ {name}: {e}", file=sys.stderr) + return [] + + +def fetch_all_feeds() -> tuple[list[dict], int, int]: + """Fetch all configured feeds. Returns (entries, successful, failed).""" + all_entries = [] + successful = 0 + failed = 0 + + for name, config in FEEDS.items(): + entries = fetch_feed(name, config.copy()) + if entries: + all_entries.extend(entries) + successful += 1 + else: + failed += 1 + + return all_entries, successful, failed + + +# --- CVE Matching --- + +def check_inventory_match(text: str) -> list[dict]: + """Check if text mentions any inventory items.""" + text_lower = text.lower() + matches = [] + + for category, items in INVENTORY.items(): + for item in items: + for alias in item.get("aliases", []): + if alias in text_lower: + matches.append({ + "category": category, + "name": item["name"], + "version": item.get("version"), + "matched_alias": alias + }) + break + + return matches + + +def analyze_matches(alerts: list[dict]) -> dict: + """Analyze alerts for inventory matches.""" + relevant = [] + critical = [] + category_counts = {} + + for alert in alerts: + text = f"{alert.get('title', '')} {alert.get('summary', '')}" + matches = check_inventory_match(text) + + if matches: + alert["inventory_matches"] = matches + alert["match_count"] = len(matches) + relevant.append(alert) + + if alert.get("severity") == "critical": + critical.append(alert) + + for match in matches: + cat = match["category"] + category_counts[cat] = category_counts.get(cat, 0) + 1 + + relevant.sort(key=lambda x: (-x.get("match_count", 0), x.get("severity", "info"))) + + return { + "analysis_time": datetime.now().isoformat(), + "source_alerts": len(alerts), + "relevant_alerts": len(relevant), + "critical_relevant": len(critical), + "category_breakdown": category_counts, + "critical": critical[:10], + "relevant": relevant[:20], + } + + +# --- Report Generation --- + +def generate_report(data: dict, use_llm: bool = False) -> str: + """Generate markdown security report.""" + now = datetime.now() + + lines = [ + "# 🔒 Security Sentinel Report", + f"**Generated:** {now.strftime('%Y-%m-%d %H:%M')}", + "" + ] + + # Stats + stats = get_stats() + lines.extend([ + "## 📊 Database Stats", + f"- **Total alerts:** {stats['total_alerts']}", + f"- **Last 24h:** {stats['last_24h']}", + f"- **Unnotified:** {stats['unnotified']}", + "" + ]) + + # Matches + if data.get("relevant"): + lines.extend([ + f"## 🎯 Relevant Alerts ({data['relevant_alerts']})", + "" + ]) + + if data.get("critical"): + lines.append("### ⚠️ Critical") + for alert in data["critical"][:5]: + matches = ", ".join(m["name"] for m in alert.get("inventory_matches", [])) + lines.extend([ + f"- **{alert['title'][:80]}**", + f" - Source: {alert.get('source', 'unknown')}", + f" - Affects: {matches}", + "" + ]) + + lines.append("### 📋 Other Relevant") + for alert in data["relevant"][:10]: + if alert in data.get("critical", []): + continue + matches = ", ".join(m["name"] for m in alert.get("inventory_matches", [])) + lines.append(f"- {alert['title'][:60]}... ({matches})") + + lines.append("") + + # AI Summary + if use_llm and data.get("relevant"): + lines.extend(["## 🤖 AI Summary", ""]) + summary = get_ai_summary(data["relevant"][:10]) + lines.extend([summary, ""]) + + # Actions + lines.extend([ + "## 📝 Recommended Actions", + "" + ]) + + if data.get("critical"): + lines.append("1. Review critical alerts and check for available patches") + + if stats["unnotified"] > 10: + lines.append(f"2. Process {stats['unnotified']} unnotified alerts") + + if not data.get("critical") and stats["unnotified"] <= 10: + lines.append("✅ No immediate actions required") + + return "\n".join(lines) + + +def get_ai_summary(alerts: list[dict]) -> str: + """Get AI summary of alerts.""" + if not alerts: + return "No alerts to summarize." + + alert_text = "\n".join([ + f"- [{a.get('severity', 'info').upper()}] {a.get('title', '')}" + for a in alerts[:15] + ]) + + prompt = f"""Du bist ein Security-Analyst. Fasse diese Security-Alerts kurz zusammen (max 5 Sätze, Deutsch). +Fokus: Was ist kritisch? Was erfordert Aktion? + +Alerts: +{alert_text} + +Zusammenfassung:""" + + try: + response = requests.post( + llm_url(), + json={ + "model": llm_model(), + "prompt": prompt, + "stream": False, + "options": {"temperature": 0.3, "num_predict": 300} + }, + timeout=60 + ) + + if response.status_code == 200: + return response.json().get("response", "").strip() + except Exception as e: + return f"(LLM nicht erreichbar: {e})" + + return "(Zusammenfassung nicht verfügbar)" + + +# --- Commands --- + +def cmd_scan(include_nmap: bool = False) -> None: + """Scan security feeds and update database.""" + init_db() + + print(f"🛡️ Sentinel Scan — {datetime.now().strftime('%Y-%m-%d %H:%M')}", file=sys.stderr) + print(f" Fetching {len(FEEDS)} feeds...", file=sys.stderr) + + all_entries, successful, failed = fetch_all_feeds() + + print(f"\n Feeds: {successful}/{successful+failed} OK", file=sys.stderr) + + # Deduplicate via SQLite + print("\n🔍 Deduplicating...", file=sys.stderr) + new_count = 0 + dupe_count = 0 + new_entries = [] + + for entry in all_entries: + if add_alert(entry): + new_entries.append(entry) + new_count += 1 + else: + dupe_count += 1 + + # Log run + log_run(len(all_entries), new_count, dupe_count, 0) + + # Stats + relevant_new = sum(1 for e in new_entries if e.get("relevant")) + critical_new = sum(1 for e in new_entries if e.get("severity") == "critical") + + print(f"\n📊 Summary:", file=sys.stderr) + print(f" Fetched: {len(all_entries)}", file=sys.stderr) + print(f" New: {new_count} ({relevant_new} relevant, {critical_new} critical)", file=sys.stderr) + print(f" Duplicates: {dupe_count}", file=sys.stderr) + + # Save to file + output = { + "fetched_at": datetime.now().isoformat(), + "stats": { + "total_fetched": len(all_entries), + "new_alerts": new_count, + "duplicates": dupe_count, + "relevant": relevant_new, + "critical": critical_new + }, + "entries": new_entries + } + + output_file = feeds_dir() / "alerts_latest.json" + output_file.write_text(json.dumps(output, indent=2)) + print(f" Output: {output_file}", file=sys.stderr) + + +def cmd_matches() -> None: + """Show CVE matches against inventory.""" + alerts = get_recent_alerts(100) + + if not alerts: + print("No alerts in database. Run 'cortex sentinel scan' first.") + return + + data = analyze_matches(alerts) + + print(f"🎯 Inventory Matches ({data['relevant_alerts']} of {data['source_alerts']})\n") + + if data.get("critical"): + print("⚠️ CRITICAL:\n") + for alert in data["critical"][:5]: + matches = ", ".join(m["name"] for m in alert.get("inventory_matches", [])) + print(f" • {alert['title'][:70]}") + print(f" Affects: {matches}\n") + + if data.get("relevant"): + print("\n📋 Other relevant:\n") + for alert in data["relevant"][:10]: + if alert in data.get("critical", []): + continue + matches = ", ".join(m["name"] for m in alert.get("inventory_matches", [])) + print(f" • {alert['title'][:60]}... ({matches})") + + if data.get("category_breakdown"): + print(f"\n📊 By category: {data['category_breakdown']}") + + # Save + report_file = reports_dir() / f"match_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + report_file.write_text(json.dumps(data, indent=2)) + + +def cmd_report(use_llm: bool = False) -> None: + """Generate security report.""" + alerts = get_recent_alerts(100) + data = analyze_matches(alerts) + + report = generate_report(data, use_llm) + + # Save + report_file = reports_dir() / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" + report_file.write_text(report) + + # Symlink latest + latest = reports_dir() / "report_latest.md" + if latest.exists() or latest.is_symlink(): + latest.unlink() + latest.symlink_to(report_file.name) + + print(f"✅ Report saved: {report_file}", file=sys.stderr) + print(report) + + +def cmd_stats() -> None: + """Show database statistics.""" + init_db() + stats = get_stats() + + print("📊 Sentinel Stats\n") + print(f"Total alerts: {stats['total_alerts']}") + print(f"Last 24h: {stats['last_24h']}") + print(f"Unnotified: {stats['unnotified']}") + + if stats.get("by_severity"): + print(f"\nBy severity:") + for sev, count in stats["by_severity"].items(): + print(f" {sev}: {count}") + + if stats.get("recent_runs"): + print(f"\nRecent runs:") + for run in stats["recent_runs"][:3]: + ts = run.get("timestamp", "")[:16] + print(f" {ts} — {run.get('new_alerts', 0)} new, {run.get('duplicates', 0)} dupes") + + +# --- Main --- + +def main(): + parser = argparse.ArgumentParser( + description='Security Feed Aggregation and CVE Matching', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Commands: + scan Fetch security feeds and update database + matches Show alerts matching local inventory + report Generate markdown security report + stats Show database statistics + +Examples: + cortex sentinel scan + cortex sentinel matches + cortex sentinel report --llm + ''' + ) + + sub = parser.add_subparsers(dest='command') + + # scan + scan_p = sub.add_parser('scan', help='Fetch security feeds') + scan_p.add_argument('--nmap', action='store_true', + help='Include network scan (slow)') + + # matches + sub.add_parser('matches', help='Show inventory matches') + + # report + report_p = sub.add_parser('report', help='Generate report') + report_p.add_argument('--llm', action='store_true', + help='Include AI summary') + + # stats + sub.add_parser('stats', help='Show database stats') + + args = parser.parse_args() + + if args.command == 'scan': + cmd_scan(getattr(args, 'nmap', False)) + elif args.command == 'matches': + cmd_matches() + elif args.command == 'report': + cmd_report(getattr(args, 'llm', False)) + elif args.command == 'stats': + cmd_stats() + else: + parser.print_help() + + +if __name__ == '__main__': + main() diff --git a/cortex/tracker.py b/cortex/tracker.py new file mode 100644 index 0000000..473d927 --- /dev/null +++ b/cortex/tracker.py @@ -0,0 +1,617 @@ +#!/usr/bin/env python3 +"""Cortex Tracker — Commitment and Contradiction Detection. + +Ported from ~/clawd/scripts/brain-tracker.mjs + +Uses an LLM (configurable via CORTEX_LLM_URL) to analyze messages for: +- Commitments/promises (someone agreeing to do something) +- Factual claims (verifiable statements) +- Contradictions between old and new claims + +Usage: + cortex track scan --since 24h + cortex track list + cortex track done + cortex track check "statement" +""" + +import argparse +import json +import os +import re +import subprocess +import sys +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional + +import requests + +from cortex.config import cortex_home, memory_dir + + +# --- Configuration --- + +def _env(key: str, default: str = '') -> str: + return os.environ.get(key, default) + + +def llm_url() -> str: + """LLM API URL (OpenAI-compatible).""" + return _env('CORTEX_LLM_URL', 'http://localhost:11434/api/generate') + + +def llm_model() -> str: + """LLM model to use.""" + return _env('CORTEX_LLM_MODEL', 'mistral:7b') + + +def tracker_db_path() -> Path: + """Path to brain-tracker.json database.""" + return memory_dir() / 'brain-tracker.json' + + +def get_nats_bin() -> str: + """Find nats binary.""" + for path in [ + os.path.expanduser('~/bin/nats'), + '/usr/local/bin/nats', + '/usr/bin/nats', + ]: + if os.path.exists(path): + return path + return 'nats' + + +# --- Database --- + +def load_db() -> dict: + """Load tracker database.""" + path = tracker_db_path() + if path.exists(): + data = json.loads(path.read_text()) + # Convert processedIds list back to set + data['processedIds'] = set(data.get('processedIds', [])) + return data + return { + 'commitments': [], + 'claims': [], + 'contradictions': [], + 'processedIds': set(), + 'lastRun': None, + } + + +def save_db(db: dict) -> None: + """Save tracker database.""" + path = tracker_db_path() + path.parent.mkdir(parents=True, exist_ok=True) + + # Convert set to list for JSON serialization + to_save = { + **db, + 'processedIds': list(db['processedIds']), + } + path.write_text(json.dumps(to_save, indent=2, ensure_ascii=False)) + + +# --- LLM Integration --- + +def query_llm(prompt: str, timeout: int = 25) -> str: + """Query the LLM with a prompt.""" + url = llm_url() + model = llm_model() + + # Detect API type from URL + if '/api/generate' in url: + # Ollama API + try: + response = requests.post( + url, + json={ + 'model': model, + 'prompt': prompt, + 'stream': False, + 'options': { + 'temperature': 0.3, + 'num_predict': 500, + } + }, + timeout=timeout + ) + if response.status_code == 200: + return response.json().get('response', '') + except Exception as e: + print(f"⚠️ LLM error: {e}", file=sys.stderr) + return '' + else: + # OpenAI-compatible API + try: + response = requests.post( + url, + headers={ + 'Authorization': f"Bearer {_env('CORTEX_LLM_API_KEY', '')}", + 'Content-Type': 'application/json', + }, + json={ + 'model': model, + 'messages': [{'role': 'user', 'content': prompt}], + 'temperature': 0.3, + 'max_tokens': 500, + }, + timeout=timeout + ) + if response.status_code == 200: + return response.json()['choices'][0]['message']['content'] + except Exception as e: + print(f"⚠️ LLM error: {e}", file=sys.stderr) + return '' + + return '' + + +def query_ollama_cli(prompt: str, timeout: int = 25) -> str: + """Query Ollama via CLI as fallback.""" + try: + result = subprocess.run( + ['ollama', 'run', llm_model()], + input=prompt, + capture_output=True, + text=True, + timeout=timeout + ) + return result.stdout or '' + except Exception: + return '' + + +# --- Event Fetching --- + +def fetch_nats_events(subject: str, count: int = 100) -> list[dict]: + """Fetch events from NATS for a given subject.""" + nats_bin = get_nats_bin() + events = [] + + # Get last message to find sequence + try: + result = subprocess.run( + [nats_bin, 'stream', 'get', 'openclaw-events', f'--last-for={subject}'], + capture_output=True, text=True, timeout=5 + ) + + match = re.search(r'openclaw-events#(\d+)', result.stdout or '') + if not match: + return [] + + last_seq = int(match.group(1)) + start_seq = max(1, last_seq - count) + + # Create ephemeral consumer + consumer_name = f'brain-{int(datetime.now().timestamp())}' + + subprocess.run( + [nats_bin, 'consumer', 'rm', 'openclaw-events', consumer_name, '--force'], + capture_output=True, timeout=3 + ) + + subprocess.run( + [nats_bin, 'consumer', 'add', 'openclaw-events', consumer_name, + '--pull', f'--deliver={start_seq}', '--ack=none', + f'--filter={subject}', '--defaults'], + capture_output=True, timeout=5 + ) + + result = subprocess.run( + [nats_bin, 'consumer', 'next', 'openclaw-events', consumer_name, + '--count', str(count), '--raw'], + capture_output=True, text=True, timeout=15 + ) + + subprocess.run( + [nats_bin, 'consumer', 'rm', 'openclaw-events', consumer_name, '--force'], + capture_output=True, timeout=3 + ) + + for line in (result.stdout or '').split('\n'): + line = line.strip() + if not line or not line.startswith('{'): + continue + try: + events.append(json.loads(line)) + except json.JSONDecodeError: + continue + + except Exception as e: + print(f"⚠️ NATS error: {e}", file=sys.stderr) + + return events + + +def load_jsonl_events(source: str, hours: int = 24) -> list[dict]: + """Load events from JSONL file.""" + events = [] + cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000) + + if source == '-': + lines = sys.stdin + else: + path = Path(source) + if not path.exists(): + return [] + lines = path.open() + + for line in lines: + line = line.strip() + if not line: + continue + try: + event = json.loads(line) + ts = event.get('timestamp', event.get('timestampMs', 0)) + if ts == 0 or ts >= cutoff: + events.append(event) + except json.JSONDecodeError: + continue + + if source != '-': + lines.close() + + return events + + +# --- Text Processing --- + +def extract_text(event: dict) -> str: + """Extract text from event.""" + if event.get('payload', {}).get('text_preview'): + previews = event['payload']['text_preview'] + if previews and isinstance(previews[0], dict): + return previews[0].get('text', '') + + if event.get('payload', {}).get('data', {}).get('text'): + return event['payload']['data']['text'] + + if event.get('payload', {}).get('text'): + return event['payload']['text'] + + return '' + + +def clean_text(text: str) -> str: + """Clean text for analysis.""" + text = re.sub(r'\[.*?\]', '', text) # Remove [timestamps], [message_id] + text = re.sub(r'\|', ' ', text) # Tables + text = re.sub(r'[*_`#]', '', text) # Markdown + text = re.sub(r'https?://\S+', '', text) # URLs + text = re.sub(r'\n+', ' ', text) # Newlines + text = re.sub(r'\s+', ' ', text) # Multiple spaces + return text.strip()[:250] + + +# --- Analysis --- + +def analyze_message(text: str, existing_claims: list[dict]) -> Optional[dict]: + """Use LLM to analyze a message for commitments and claims.""" + clean = clean_text(text) + if len(clean) < 20: + return None + if 'HEARTBEAT' in clean or 'cron:' in clean: + return None + + prompt = f'''Analyze this message for: +1. Commitments/promises (someone agreeing to do something) +2. Factual claims (verifiable statements) + +Message: "{clean}" + +Respond with JSON only: +{{ + "commitment": {{"found": false}} or {{"found": true, "what": "description", "who": "user/assistant", "deadline": "if mentioned"}}, + "claims": [] or [{{"claim": "statement", "topic": "category", "entities": ["names"]}}] +}}''' + + response = query_llm(prompt) or query_ollama_cli(prompt) + + # Extract JSON from response + json_match = re.search(r'\{[\s\S]*\}', response) + if not json_match: + return None + + try: + return json.loads(json_match.group(0)) + except json.JSONDecodeError: + return None + + +def check_contradiction(new_claim: dict, existing_claims: list[dict]) -> Optional[dict]: + """Check if a new claim contradicts existing claims.""" + # Find related claims by entities + related = [] + new_entities = [e.lower() for e in new_claim.get('entities', [])] + + for c in existing_claims: + old_entities = [e.lower() for e in c.get('entities', [])] + if any(ne in oe or oe in ne for ne in new_entities for oe in old_entities): + related.append(c) + + if not related: + return None + + for old in related[:2]: # Check only first 2 + prompt = f'''Do these contradict? +Old ({old.get("date", "unknown")}): "{old.get("claim", "")}" +New: "{new_claim.get("claim", "")}" +Respond: {{"contradicts": true/false, "why": "explanation", "severity": "minor/major"}}''' + + response = query_llm(prompt) or query_ollama_cli(prompt) + json_match = re.search(r'\{[\s\S]*?\}', response) + + if json_match: + try: + check = json.loads(json_match.group(0)) + if check.get('contradicts'): + return {**check, 'original': old} + except json.JSONDecodeError: + continue + + return None + + +# --- Commands --- + +def cmd_scan(hours: int = 24, jsonl: Optional[str] = None) -> None: + """Scan recent messages for commitments and claims.""" + db = load_db() + cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000) + + print(f"🧠 Brain Tracker — Scanning last {hours}h\n", file=sys.stderr) + + # Fetch events + if jsonl: + all_events = load_jsonl_events(jsonl, hours) + else: + print("📥 Fetching messages...", file=sys.stderr) + in_events = fetch_nats_events('openclaw.events.main.conversation_message_in', 200) + out_events = fetch_nats_events('openclaw.events.main.conversation_message_out', 200) + all_events = in_events + out_events + + # Filter and sort + events = [ + e for e in all_events + if e.get('timestamp', 0) >= cutoff and e.get('id') not in db['processedIds'] + ] + events.sort(key=lambda x: x.get('timestamp', 0)) + + print(f" Found {len(events)} new events to analyze\n", file=sys.stderr) + + commitments = 0 + claims = 0 + contradictions = 0 + + for i, event in enumerate(events): + text = extract_text(event) + clean = clean_text(text) + + if len(clean) < 20: + db['processedIds'].add(event.get('id', '')) + continue + + print(f"\r[{i+1}/{len(events)}] Analyzing...", end='', file=sys.stderr) + + analysis = analyze_message(text, db['claims']) + + if not analysis: + db['processedIds'].add(event.get('id', '')) + continue + + date = datetime.fromtimestamp(event.get('timestamp', 0) / 1000).strftime('%Y-%m-%d') + + # Process commitment + if analysis.get('commitment', {}).get('found'): + c = analysis['commitment'] + import random + import string + commit_id = f"commit-{int(datetime.now().timestamp())}-{''.join(random.choices(string.ascii_lowercase, k=4))}" + + db['commitments'].append({ + 'id': commit_id, + 'what': c.get('what', ''), + 'who': c.get('who', ''), + 'deadline': c.get('deadline'), + 'source': clean[:100], + 'date': date, + 'status': 'open', + }) + commitments += 1 + print(f"\n✅ Commitment: \"{c.get('what', '')}\"", file=sys.stderr) + + # Process claims + for claim in analysis.get('claims', []): + if not claim.get('claim'): + continue + + import random + import string + claim_id = f"claim-{int(datetime.now().timestamp())}-{''.join(random.choices(string.ascii_lowercase, k=4))}" + + new_claim = { + 'id': claim_id, + 'claim': claim['claim'], + 'topic': claim.get('topic', 'general'), + 'entities': claim.get('entities', []), + 'source': clean[:100], + 'date': date, + } + + # Check for contradictions + contra = check_contradiction(new_claim, db['claims']) + if contra: + db['contradictions'].append({ + 'id': f"contra-{int(datetime.now().timestamp())}", + 'new_claim': new_claim, + 'old_claim': contra['original'], + 'why': contra.get('why', ''), + 'severity': contra.get('severity', 'unknown'), + 'date': date, + 'status': 'unresolved', + }) + contradictions += 1 + print(f"\n⚠️ CONTRADICTION: {contra.get('why', '')}", file=sys.stderr) + + db['claims'].append(new_claim) + claims += 1 + + db['processedIds'].add(event.get('id', '')) + + # Save periodically + if i % 10 == 0: + save_db(db) + + db['lastRun'] = datetime.now().isoformat() + save_db(db) + + print(f"\n\n📊 Results:", file=sys.stderr) + print(f" New Commitments: {commitments}", file=sys.stderr) + print(f" New Claims: {claims}", file=sys.stderr) + print(f" Contradictions: {contradictions}", file=sys.stderr) + print(f" Total in DB: {len(db['commitments'])} commitments, {len(db['claims'])} claims", file=sys.stderr) + + +def cmd_list() -> None: + """List open commitments and unresolved contradictions.""" + db = load_db() + + open_commits = [c for c in db.get('commitments', []) if c.get('status') == 'open'] + unresolved = [c for c in db.get('contradictions', []) if c.get('status') == 'unresolved'] + + print("🧠 Brain Tracker Status\n") + + if open_commits: + print(f"📋 Open Commitments ({len(open_commits)}):\n") + for c in open_commits: + print(f" [{c['id'][-6:]}] {c.get('what', '')}") + deadline = f" | ⏰ {c['deadline']}" if c.get('deadline') else '' + print(f" 👤 {c.get('who', 'unknown')} | 📅 {c.get('date', '')}{deadline}\n") + else: + print("✅ No open commitments\n") + + if unresolved: + print(f"⚠️ Contradictions ({len(unresolved)}):\n") + for c in unresolved: + severity = c.get('severity', 'UNKNOWN').upper() + print(f" [{severity}] {c.get('why', '')}") + print(f" Old: \"{c.get('old_claim', {}).get('claim', '')}\"") + print(f" New: \"{c.get('new_claim', {}).get('claim', '')}\"\n") + else: + print("✅ No contradictions\n") + + print(f"📊 Total: {len(db.get('claims', []))} claims, {len(db.get('commitments', []))} commitments") + + +def cmd_done(id_fragment: str) -> None: + """Mark a commitment as done.""" + db = load_db() + + commit = next((c for c in db['commitments'] if id_fragment in c['id']), None) + if commit: + commit['status'] = 'done' + commit['completed'] = datetime.now().isoformat() + save_db(db) + print(f"✅ Marked done: {commit.get('what', '')}") + else: + print(f"❌ Not found: {id_fragment}") + + +def cmd_check(statement: str) -> None: + """Check a statement against existing claims.""" + db = load_db() + q = statement.lower() + + matches = [ + c for c in db.get('claims', []) + if q in c.get('claim', '').lower() or + any(q in e.lower() for e in c.get('entities', [])) + ] + + if not matches: + print(f"No claims matching: {statement}") + return + + print(f"🔍 Claims matching \"{statement}\" ({len(matches)}):\n") + for c in matches[-15:]: + print(f"[{c.get('date', '')}] {c.get('claim', '')}") + if c.get('entities'): + print(f" Entities: {', '.join(c['entities'])}") + print() + + +# --- Main --- + +def main(): + parser = argparse.ArgumentParser( + description='Commitment and Contradiction Detection', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Commands: + scan Analyze recent messages for commitments and claims + list Show open commitments and contradictions + done Mark a commitment as completed + check Search claims for a statement + +Examples: + cortex track scan --since 24h + cortex track list + cortex track done commit-abc123 + cortex track check "TypeDB" + ''' + ) + + sub = parser.add_subparsers(dest='command') + + # scan + scan_p = sub.add_parser('scan', help='Scan messages for commitments/claims') + scan_p.add_argument('--since', default='24h', + help='Time window (e.g. 24h, 7d)') + scan_p.add_argument('--jsonl', metavar='FILE', + help='Load events from JSONL file') + + # list + sub.add_parser('list', help='Show open items') + + # done + done_p = sub.add_parser('done', help='Mark commitment as done') + done_p.add_argument('id', help='Commitment ID (or fragment)') + + # check + check_p = sub.add_parser('check', help='Search claims') + check_p.add_argument('statement', nargs='+', help='Statement to search') + + args = parser.parse_args() + + if args.command == 'scan': + # Parse duration + match = re.match(r'^(\d+)([hdm])$', args.since.lower()) + if not match: + print(f"❌ Invalid duration: {args.since}", file=sys.stderr) + sys.exit(1) + + value = int(match.group(1)) + unit = match.group(2) + hours = value if unit == 'h' else (value * 24 if unit == 'd' else value / 60) + + cmd_scan(int(hours), args.jsonl) + + elif args.command == 'list': + cmd_list() + + elif args.command == 'done': + cmd_done(args.id) + + elif args.command == 'check': + cmd_check(' '.join(args.statement)) + + else: + parser.print_help() + + +if __name__ == '__main__': + main() diff --git a/tests/test_new_modules.py b/tests/test_new_modules.py new file mode 100644 index 0000000..bd71d8a --- /dev/null +++ b/tests/test_new_modules.py @@ -0,0 +1,436 @@ +#!/usr/bin/env python3 +"""Tests for new cortex modules: learn, context, tracker, sentinel. + +Run with: CORTEX_HOME=~/clawd python3 -m pytest tests/test_new_modules.py -v +""" + +import json +import os +import sys +import tempfile +from pathlib import Path +from datetime import datetime, timedelta +from unittest.mock import patch, MagicMock + +import pytest + +# Set up CORTEX_HOME before importing modules +os.environ.setdefault('CORTEX_HOME', os.path.expanduser('~/clawd')) +os.environ.setdefault('CORTEX_MEMORY_DIR', os.path.expanduser('~/clawd/memory')) + +from cortex import learn, context, tracker, sentinel +from cortex.config import cortex_home, memory_dir + + +# --- Fixtures --- + +@pytest.fixture +def temp_cortex_home(tmp_path): + """Temporary CORTEX_HOME for isolated tests.""" + old_home = os.environ.get('CORTEX_HOME') + old_memory = os.environ.get('CORTEX_MEMORY_DIR') + + os.environ['CORTEX_HOME'] = str(tmp_path) + os.environ['CORTEX_MEMORY_DIR'] = str(tmp_path / 'memory') + + yield tmp_path + + if old_home: + os.environ['CORTEX_HOME'] = old_home + else: + os.environ.pop('CORTEX_HOME', None) + if old_memory: + os.environ['CORTEX_MEMORY_DIR'] = old_memory + else: + os.environ.pop('CORTEX_MEMORY_DIR', None) + + +@pytest.fixture +def sample_events(): + """Sample NATS events for testing.""" + now = int(datetime.now().timestamp() * 1000) + return [ + { + 'id': 'evt-001', + 'timestamp': now - 3600000, # 1 hour ago + 'type': 'conversation_message_in', + 'payload': { + 'data': {'text': 'Das war mega! Super gemacht 👍'} + } + }, + { + 'id': 'evt-002', + 'timestamp': now - 7200000, # 2 hours ago + 'type': 'conversation_message_out', + 'payload': { + 'data': {'text': 'Ich werde das TypeScript Projekt morgen fertigstellen.'} + } + }, + { + 'id': 'evt-003', + 'timestamp': now - 10800000, # 3 hours ago + 'type': 'tool_call', + 'payload': { + 'data': {'name': 'Read', 'text': 'Reading file...'} + } + }, + { + 'id': 'evt-004', + 'timestamp': now - 14400000, # 4 hours ago + 'type': 'conversation_message_in', + 'payload': { + 'data': {'text': 'Nein, ich meinte eigentlich Python, nicht JavaScript.'} + } + }, + { + 'id': 'evt-005', + 'timestamp': now - 18000000, # 5 hours ago + 'type': 'error', + 'payload': { + 'data': {'text': 'Error: Module not found in Node.js project'} + } + }, + ] + + +@pytest.fixture +def sample_alerts(): + """Sample security alerts for testing.""" + return [ + { + 'id': 'alert-001', + 'source': 'bleepingcomputer', + 'category': 'security-news', + 'title': 'Critical OpenSSH vulnerability allows remote code execution', + 'link': 'https://example.com/openssh-vuln', + 'summary': 'A critical RCE vulnerability in OpenSSH...', + 'severity': 'critical', + 'relevant': True, + }, + { + 'id': 'alert-002', + 'source': 'hackernews', + 'category': 'security-news', + 'title': 'New Docker container escape vulnerability discovered', + 'link': 'https://example.com/docker-escape', + 'summary': 'Researchers found a container escape bug in Docker...', + 'severity': 'high', + 'relevant': True, + }, + { + 'id': 'alert-003', + 'source': 'schneier', + 'category': 'security-news', + 'title': 'Thoughts on Password Managers', + 'link': 'https://example.com/password-mgr', + 'summary': 'Discussion about password manager security...', + 'severity': 'info', + 'relevant': False, + }, + ] + + +# --- Learn Module Tests --- + +class TestLearn: + """Tests for cortex.learn module.""" + + def test_parse_duration_hours(self): + """Test duration parsing for hours.""" + delta = learn.parse_duration('24h') + assert delta == timedelta(hours=24) + + def test_parse_duration_days(self): + """Test duration parsing for days.""" + delta = learn.parse_duration('7d') + assert delta == timedelta(days=7) + + def test_parse_duration_minutes(self): + """Test duration parsing for minutes.""" + delta = learn.parse_duration('30m') + assert delta == timedelta(minutes=30) + + def test_parse_duration_invalid(self): + """Test invalid duration format.""" + with pytest.raises(ValueError): + learn.parse_duration('invalid') + + def test_extract_text_from_payload(self): + """Test text extraction from various event formats.""" + event = {'payload': {'data': {'text': 'Hello world'}}} + assert learn.extract_text(event) == 'Hello world' + + def test_extract_text_from_content(self): + """Test text extraction from content field.""" + event = {'payload': {'content': 'Test content'}} + assert learn.extract_text(event) == 'Test content' + + def test_extract_signals_positive(self, sample_events): + """Test extraction of positive signals.""" + signals = learn.extract_signals(sample_events) + assert len(signals['positive']) >= 1 + # Should detect "mega" and "Super gemacht" + + def test_extract_signals_corrections(self, sample_events): + """Test extraction of correction signals.""" + signals = learn.extract_signals(sample_events) + assert len(signals['corrections']) >= 1 + # Should detect "Nein, ich meinte eigentlich" + + def test_extract_signals_skill_gaps(self, sample_events): + """Test extraction of skill gaps.""" + signals = learn.extract_signals(sample_events) + assert 'node' in signals['skill_gaps'] or 'nodejs' in signals['skill_gaps'].keys() + + def test_derive_preferences(self, sample_events): + """Test preference derivation.""" + signals = learn.extract_signals(sample_events) + prefs = learn.derive_preferences(signals, {}) + + assert 'lastAnalysis' in prefs + assert 'signalCounts' in prefs + assert 'explicit' in prefs + + def test_run_learning_cycle(self, temp_cortex_home, sample_events): + """Test full learning cycle.""" + results = learn.run_learning_cycle(sample_events, 'test') + + assert 'preferences' in results + assert 'behaviors' in results + assert 'topics' in results + assert 'skill_gaps' in results + + # Check files were created + ldir = temp_cortex_home / 'learning' / 'test' + assert (ldir / 'preferences.json').exists() + assert (ldir / 'behaviors.json').exists() + assert (ldir / 'learning-context.md').exists() + + +# --- Context Module Tests --- + +class TestContext: + """Tests for cortex.context module.""" + + def test_extract_text(self): + """Test text extraction from events.""" + event = {'payload': {'data': {'text': 'Test message'}}} + text = context.extract_text(event) + assert text == 'Test message' + + def test_analyze_events_topics(self, sample_events): + """Test topic detection in events.""" + patterns = context.analyze_events(sample_events) + # Should have detected topics based on keywords + assert isinstance(patterns['topics'], dict) + + def test_analyze_events_tools(self, sample_events): + """Test tool tracking.""" + patterns = context.analyze_events(sample_events) + assert 'Read' in patterns['tools'] + + def test_analyze_events_languages(self, sample_events): + """Test language detection.""" + patterns = context.analyze_events(sample_events) + # Should detect German (more German words in samples) + assert patterns['languages']['de'] > 0 + + def test_generate_context_md(self, sample_events): + """Test markdown generation.""" + patterns = context.analyze_events(sample_events) + md = context.generate_context_md(patterns, len(sample_events), None) + + assert '# Learning Context' in md + assert 'User Preferences' in md + assert 'Most Used Tools' in md + + def test_run_context_generation(self, temp_cortex_home, sample_events): + """Test full context generation.""" + patterns = context.run_context_generation(sample_events, agent='test') + + # Check output file + output = temp_cortex_home / 'learning' / 'test' / 'learning-context.md' + assert output.exists() + content = output.read_text() + assert '# Learning Context' in content + + +# --- Tracker Module Tests --- + +class TestTracker: + """Tests for cortex.tracker module.""" + + def test_clean_text(self): + """Test text cleaning.""" + text = "Hello [timestamp] **bold** https://example.com" + clean = tracker.clean_text(text) + + assert '[timestamp]' not in clean + assert '**' not in clean + assert 'https://' not in clean + + def test_extract_text(self): + """Test text extraction from events.""" + event = {'payload': {'text_preview': [{'text': 'Test'}]}} + text = tracker.extract_text(event) + assert text == 'Test' + + def test_load_save_db(self, temp_cortex_home): + """Test database load/save.""" + db = tracker.load_db() + + assert 'commitments' in db + assert 'claims' in db + assert 'processedIds' in db + + # Add some data + db['commitments'].append({'id': 'test-001', 'what': 'Test commitment'}) + db['processedIds'].add('evt-001') + + tracker.save_db(db) + + # Reload and verify + db2 = tracker.load_db() + assert len(db2['commitments']) == 1 + assert 'evt-001' in db2['processedIds'] + + @patch('cortex.tracker.query_llm') + def test_analyze_message_no_llm(self, mock_llm): + """Test message analysis fallback.""" + mock_llm.return_value = '' + + result = tracker.analyze_message('Short text', []) + assert result is None # Too short after cleaning + + def test_cmd_list(self, temp_cortex_home, capsys): + """Test list command output.""" + # Create test data + db = tracker.load_db() + db['commitments'].append({ + 'id': 'commit-test', + 'what': 'Test commitment', + 'who': 'user', + 'date': '2024-01-01', + 'status': 'open', + }) + tracker.save_db(db) + + tracker.cmd_list() + + captured = capsys.readouterr() + assert 'Test commitment' in captured.out + + +# --- Sentinel Module Tests --- + +class TestSentinel: + """Tests for cortex.sentinel module.""" + + def test_init_db(self, temp_cortex_home): + """Test database initialization.""" + sentinel.init_db() + + db_path = temp_cortex_home / 'sentinel' / 'sentinel.db' + assert db_path.exists() + + def test_add_alert_new(self, temp_cortex_home, sample_alerts): + """Test adding new alerts.""" + sentinel.init_db() + + alert = sample_alerts[0] + is_new = sentinel.add_alert(alert) + assert is_new is True + + # Adding same alert again should return False + is_new2 = sentinel.add_alert(alert) + assert is_new2 is False + + def test_get_stats(self, temp_cortex_home, sample_alerts): + """Test statistics retrieval.""" + sentinel.init_db() + + for alert in sample_alerts: + sentinel.add_alert(alert) + + stats = sentinel.get_stats() + + assert stats['total_alerts'] == 3 + assert 'by_severity' in stats + assert stats['by_severity'].get('critical', 0) == 1 + + def test_check_inventory_match(self): + """Test inventory matching.""" + text = "Critical vulnerability in OpenSSH and Docker" + matches = sentinel.check_inventory_match(text) + + # Should match OpenSSH and Docker + names = [m['name'] for m in matches] + assert 'OpenSSH' in names + assert 'Docker' in names + + def test_analyze_matches(self, sample_alerts): + """Test alert analysis for matches.""" + result = sentinel.analyze_matches(sample_alerts) + + assert 'relevant_alerts' in result + assert 'critical_relevant' in result + assert 'category_breakdown' in result + + def test_generate_report(self, temp_cortex_home, sample_alerts): + """Test report generation.""" + sentinel.init_db() + + for alert in sample_alerts: + sentinel.add_alert(alert) + + data = sentinel.analyze_matches(sample_alerts) + report = sentinel.generate_report(data, use_llm=False) + + assert '# 🔒 Security Sentinel Report' in report + assert 'Database Stats' in report + + +# --- Integration Tests --- + +class TestIntegration: + """Integration tests for module interactions.""" + + def test_cli_learn_help(self): + """Test learn command help.""" + import subprocess + result = subprocess.run( + ['python3', '-m', 'cortex.learn', '--help'], + capture_output=True, text=True + ) + assert 'preferences' in result.stdout.lower() or result.returncode == 0 + + def test_cli_context_help(self): + """Test context command help.""" + import subprocess + result = subprocess.run( + ['python3', '-m', 'cortex.context', '--help'], + capture_output=True, text=True + ) + assert 'events' in result.stdout.lower() or result.returncode == 0 + + def test_cli_tracker_help(self): + """Test tracker command help.""" + import subprocess + result = subprocess.run( + ['python3', '-m', 'cortex.tracker', '--help'], + capture_output=True, text=True + ) + assert 'scan' in result.stdout.lower() or result.returncode == 0 + + def test_cli_sentinel_help(self): + """Test sentinel command help.""" + import subprocess + result = subprocess.run( + ['python3', '-m', 'cortex.sentinel', '--help'], + capture_output=True, text=True + ) + assert 'scan' in result.stdout.lower() or result.returncode == 0 + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])