436 lines
14 KiB
Python
436 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""Tests for new cortex modules: learn, context, tracker, sentinel.
|
|
|
|
Run with: CORTEX_HOME=~/clawd python3 -m pytest tests/test_new_modules.py -v
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
|
|
# Set up CORTEX_HOME before importing modules
|
|
os.environ.setdefault('CORTEX_HOME', os.path.expanduser('~/clawd'))
|
|
os.environ.setdefault('CORTEX_MEMORY_DIR', os.path.expanduser('~/clawd/memory'))
|
|
|
|
from cortex import learn, context, tracker, sentinel
|
|
from cortex.config import cortex_home, memory_dir
|
|
|
|
|
|
# --- Fixtures ---
|
|
|
|
@pytest.fixture
|
|
def temp_cortex_home(tmp_path):
|
|
"""Temporary CORTEX_HOME for isolated tests."""
|
|
old_home = os.environ.get('CORTEX_HOME')
|
|
old_memory = os.environ.get('CORTEX_MEMORY_DIR')
|
|
|
|
os.environ['CORTEX_HOME'] = str(tmp_path)
|
|
os.environ['CORTEX_MEMORY_DIR'] = str(tmp_path / 'memory')
|
|
|
|
yield tmp_path
|
|
|
|
if old_home:
|
|
os.environ['CORTEX_HOME'] = old_home
|
|
else:
|
|
os.environ.pop('CORTEX_HOME', None)
|
|
if old_memory:
|
|
os.environ['CORTEX_MEMORY_DIR'] = old_memory
|
|
else:
|
|
os.environ.pop('CORTEX_MEMORY_DIR', None)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_events():
|
|
"""Sample NATS events for testing."""
|
|
now = int(datetime.now().timestamp() * 1000)
|
|
return [
|
|
{
|
|
'id': 'evt-001',
|
|
'timestamp': now - 3600000, # 1 hour ago
|
|
'type': 'conversation_message_in',
|
|
'payload': {
|
|
'data': {'text': 'Das war mega! Super gemacht 👍'}
|
|
}
|
|
},
|
|
{
|
|
'id': 'evt-002',
|
|
'timestamp': now - 7200000, # 2 hours ago
|
|
'type': 'conversation_message_out',
|
|
'payload': {
|
|
'data': {'text': 'Ich werde das TypeScript Projekt morgen fertigstellen.'}
|
|
}
|
|
},
|
|
{
|
|
'id': 'evt-003',
|
|
'timestamp': now - 10800000, # 3 hours ago
|
|
'type': 'tool_call',
|
|
'payload': {
|
|
'data': {'name': 'Read', 'text': 'Reading file...'}
|
|
}
|
|
},
|
|
{
|
|
'id': 'evt-004',
|
|
'timestamp': now - 14400000, # 4 hours ago
|
|
'type': 'conversation_message_in',
|
|
'payload': {
|
|
'data': {'text': 'Nein, ich meinte eigentlich Python, nicht JavaScript.'}
|
|
}
|
|
},
|
|
{
|
|
'id': 'evt-005',
|
|
'timestamp': now - 18000000, # 5 hours ago
|
|
'type': 'error',
|
|
'payload': {
|
|
'data': {'text': 'Error: Module not found in Node.js project'}
|
|
}
|
|
},
|
|
]
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_alerts():
|
|
"""Sample security alerts for testing."""
|
|
return [
|
|
{
|
|
'id': 'alert-001',
|
|
'source': 'bleepingcomputer',
|
|
'category': 'security-news',
|
|
'title': 'Critical OpenSSH vulnerability allows remote code execution',
|
|
'link': 'https://example.com/openssh-vuln',
|
|
'summary': 'A critical RCE vulnerability in OpenSSH...',
|
|
'severity': 'critical',
|
|
'relevant': True,
|
|
},
|
|
{
|
|
'id': 'alert-002',
|
|
'source': 'hackernews',
|
|
'category': 'security-news',
|
|
'title': 'New Docker container escape vulnerability discovered',
|
|
'link': 'https://example.com/docker-escape',
|
|
'summary': 'Researchers found a container escape bug in Docker...',
|
|
'severity': 'high',
|
|
'relevant': True,
|
|
},
|
|
{
|
|
'id': 'alert-003',
|
|
'source': 'schneier',
|
|
'category': 'security-news',
|
|
'title': 'Thoughts on Password Managers',
|
|
'link': 'https://example.com/password-mgr',
|
|
'summary': 'Discussion about password manager security...',
|
|
'severity': 'info',
|
|
'relevant': False,
|
|
},
|
|
]
|
|
|
|
|
|
# --- Learn Module Tests ---
|
|
|
|
class TestLearn:
|
|
"""Tests for cortex.learn module."""
|
|
|
|
def test_parse_duration_hours(self):
|
|
"""Test duration parsing for hours."""
|
|
delta = learn.parse_duration('24h')
|
|
assert delta == timedelta(hours=24)
|
|
|
|
def test_parse_duration_days(self):
|
|
"""Test duration parsing for days."""
|
|
delta = learn.parse_duration('7d')
|
|
assert delta == timedelta(days=7)
|
|
|
|
def test_parse_duration_minutes(self):
|
|
"""Test duration parsing for minutes."""
|
|
delta = learn.parse_duration('30m')
|
|
assert delta == timedelta(minutes=30)
|
|
|
|
def test_parse_duration_invalid(self):
|
|
"""Test invalid duration format."""
|
|
with pytest.raises(ValueError):
|
|
learn.parse_duration('invalid')
|
|
|
|
def test_extract_text_from_payload(self):
|
|
"""Test text extraction from various event formats."""
|
|
event = {'payload': {'data': {'text': 'Hello world'}}}
|
|
assert learn.extract_text(event) == 'Hello world'
|
|
|
|
def test_extract_text_from_content(self):
|
|
"""Test text extraction from content field."""
|
|
event = {'payload': {'content': 'Test content'}}
|
|
assert learn.extract_text(event) == 'Test content'
|
|
|
|
def test_extract_signals_positive(self, sample_events):
|
|
"""Test extraction of positive signals."""
|
|
signals = learn.extract_signals(sample_events)
|
|
assert len(signals['positive']) >= 1
|
|
# Should detect "mega" and "Super gemacht"
|
|
|
|
def test_extract_signals_corrections(self, sample_events):
|
|
"""Test extraction of correction signals."""
|
|
signals = learn.extract_signals(sample_events)
|
|
assert len(signals['corrections']) >= 1
|
|
# Should detect "Nein, ich meinte eigentlich"
|
|
|
|
def test_extract_signals_skill_gaps(self, sample_events):
|
|
"""Test extraction of skill gaps."""
|
|
signals = learn.extract_signals(sample_events)
|
|
assert 'node' in signals['skill_gaps'] or 'nodejs' in signals['skill_gaps'].keys()
|
|
|
|
def test_derive_preferences(self, sample_events):
|
|
"""Test preference derivation."""
|
|
signals = learn.extract_signals(sample_events)
|
|
prefs = learn.derive_preferences(signals, {})
|
|
|
|
assert 'lastAnalysis' in prefs
|
|
assert 'signalCounts' in prefs
|
|
assert 'explicit' in prefs
|
|
|
|
def test_run_learning_cycle(self, temp_cortex_home, sample_events):
|
|
"""Test full learning cycle."""
|
|
results = learn.run_learning_cycle(sample_events, 'test')
|
|
|
|
assert 'preferences' in results
|
|
assert 'behaviors' in results
|
|
assert 'topics' in results
|
|
assert 'skill_gaps' in results
|
|
|
|
# Check files were created
|
|
ldir = temp_cortex_home / 'learning' / 'test'
|
|
assert (ldir / 'preferences.json').exists()
|
|
assert (ldir / 'behaviors.json').exists()
|
|
assert (ldir / 'learning-context.md').exists()
|
|
|
|
|
|
# --- Context Module Tests ---
|
|
|
|
class TestContext:
|
|
"""Tests for cortex.context module."""
|
|
|
|
def test_extract_text(self):
|
|
"""Test text extraction from events."""
|
|
event = {'payload': {'data': {'text': 'Test message'}}}
|
|
text = context.extract_text(event)
|
|
assert text == 'Test message'
|
|
|
|
def test_analyze_events_topics(self, sample_events):
|
|
"""Test topic detection in events."""
|
|
patterns = context.analyze_events(sample_events)
|
|
# Should have detected topics based on keywords
|
|
assert isinstance(patterns['topics'], dict)
|
|
|
|
def test_analyze_events_tools(self, sample_events):
|
|
"""Test tool tracking."""
|
|
patterns = context.analyze_events(sample_events)
|
|
assert 'Read' in patterns['tools']
|
|
|
|
def test_analyze_events_languages(self, sample_events):
|
|
"""Test language detection."""
|
|
patterns = context.analyze_events(sample_events)
|
|
# Should detect German (more German words in samples)
|
|
assert patterns['languages']['de'] > 0
|
|
|
|
def test_generate_context_md(self, sample_events):
|
|
"""Test markdown generation."""
|
|
patterns = context.analyze_events(sample_events)
|
|
md = context.generate_context_md(patterns, len(sample_events), None)
|
|
|
|
assert '# Learning Context' in md
|
|
assert 'User Preferences' in md
|
|
assert 'Most Used Tools' in md
|
|
|
|
def test_run_context_generation(self, temp_cortex_home, sample_events):
|
|
"""Test full context generation."""
|
|
patterns = context.run_context_generation(sample_events, agent='test')
|
|
|
|
# Check output file
|
|
output = temp_cortex_home / 'learning' / 'test' / 'learning-context.md'
|
|
assert output.exists()
|
|
content = output.read_text()
|
|
assert '# Learning Context' in content
|
|
|
|
|
|
# --- Tracker Module Tests ---
|
|
|
|
class TestTracker:
|
|
"""Tests for cortex.tracker module."""
|
|
|
|
def test_clean_text(self):
|
|
"""Test text cleaning."""
|
|
text = "Hello [timestamp] **bold** https://example.com"
|
|
clean = tracker.clean_text(text)
|
|
|
|
assert '[timestamp]' not in clean
|
|
assert '**' not in clean
|
|
assert 'https://' not in clean
|
|
|
|
def test_extract_text(self):
|
|
"""Test text extraction from events."""
|
|
event = {'payload': {'text_preview': [{'text': 'Test'}]}}
|
|
text = tracker.extract_text(event)
|
|
assert text == 'Test'
|
|
|
|
def test_load_save_db(self, temp_cortex_home):
|
|
"""Test database load/save."""
|
|
db = tracker.load_db()
|
|
|
|
assert 'commitments' in db
|
|
assert 'claims' in db
|
|
assert 'processedIds' in db
|
|
|
|
# Add some data
|
|
db['commitments'].append({'id': 'test-001', 'what': 'Test commitment'})
|
|
db['processedIds'].add('evt-001')
|
|
|
|
tracker.save_db(db)
|
|
|
|
# Reload and verify
|
|
db2 = tracker.load_db()
|
|
assert len(db2['commitments']) == 1
|
|
assert 'evt-001' in db2['processedIds']
|
|
|
|
@patch('cortex.tracker.query_llm')
|
|
def test_analyze_message_no_llm(self, mock_llm):
|
|
"""Test message analysis fallback."""
|
|
mock_llm.return_value = ''
|
|
|
|
result = tracker.analyze_message('Short text', [])
|
|
assert result is None # Too short after cleaning
|
|
|
|
def test_cmd_list(self, temp_cortex_home, capsys):
|
|
"""Test list command output."""
|
|
# Create test data
|
|
db = tracker.load_db()
|
|
db['commitments'].append({
|
|
'id': 'commit-test',
|
|
'what': 'Test commitment',
|
|
'who': 'user',
|
|
'date': '2024-01-01',
|
|
'status': 'open',
|
|
})
|
|
tracker.save_db(db)
|
|
|
|
tracker.cmd_list()
|
|
|
|
captured = capsys.readouterr()
|
|
assert 'Test commitment' in captured.out
|
|
|
|
|
|
# --- Sentinel Module Tests ---
|
|
|
|
class TestSentinel:
|
|
"""Tests for cortex.sentinel module."""
|
|
|
|
def test_init_db(self, temp_cortex_home):
|
|
"""Test database initialization."""
|
|
sentinel.init_db()
|
|
|
|
db_path = temp_cortex_home / 'sentinel' / 'sentinel.db'
|
|
assert db_path.exists()
|
|
|
|
def test_add_alert_new(self, temp_cortex_home, sample_alerts):
|
|
"""Test adding new alerts."""
|
|
sentinel.init_db()
|
|
|
|
alert = sample_alerts[0]
|
|
is_new = sentinel.add_alert(alert)
|
|
assert is_new is True
|
|
|
|
# Adding same alert again should return False
|
|
is_new2 = sentinel.add_alert(alert)
|
|
assert is_new2 is False
|
|
|
|
def test_get_stats(self, temp_cortex_home, sample_alerts):
|
|
"""Test statistics retrieval."""
|
|
sentinel.init_db()
|
|
|
|
for alert in sample_alerts:
|
|
sentinel.add_alert(alert)
|
|
|
|
stats = sentinel.get_stats()
|
|
|
|
assert stats['total_alerts'] == 3
|
|
assert 'by_severity' in stats
|
|
assert stats['by_severity'].get('critical', 0) == 1
|
|
|
|
def test_check_inventory_match(self):
|
|
"""Test inventory matching."""
|
|
text = "Critical vulnerability in OpenSSH and Docker"
|
|
matches = sentinel.check_inventory_match(text)
|
|
|
|
# Should match OpenSSH and Docker
|
|
names = [m['name'] for m in matches]
|
|
assert 'OpenSSH' in names
|
|
assert 'Docker' in names
|
|
|
|
def test_analyze_matches(self, sample_alerts):
|
|
"""Test alert analysis for matches."""
|
|
result = sentinel.analyze_matches(sample_alerts)
|
|
|
|
assert 'relevant_alerts' in result
|
|
assert 'critical_relevant' in result
|
|
assert 'category_breakdown' in result
|
|
|
|
def test_generate_report(self, temp_cortex_home, sample_alerts):
|
|
"""Test report generation."""
|
|
sentinel.init_db()
|
|
|
|
for alert in sample_alerts:
|
|
sentinel.add_alert(alert)
|
|
|
|
data = sentinel.analyze_matches(sample_alerts)
|
|
report = sentinel.generate_report(data, use_llm=False)
|
|
|
|
assert '# 🔒 Security Sentinel Report' in report
|
|
assert 'Database Stats' in report
|
|
|
|
|
|
# --- Integration Tests ---
|
|
|
|
class TestIntegration:
|
|
"""Integration tests for module interactions."""
|
|
|
|
def test_cli_learn_help(self):
|
|
"""Test learn command help."""
|
|
import subprocess
|
|
result = subprocess.run(
|
|
['python3', '-m', 'cortex.learn', '--help'],
|
|
capture_output=True, text=True
|
|
)
|
|
assert 'preferences' in result.stdout.lower() or result.returncode == 0
|
|
|
|
def test_cli_context_help(self):
|
|
"""Test context command help."""
|
|
import subprocess
|
|
result = subprocess.run(
|
|
['python3', '-m', 'cortex.context', '--help'],
|
|
capture_output=True, text=True
|
|
)
|
|
assert 'events' in result.stdout.lower() or result.returncode == 0
|
|
|
|
def test_cli_tracker_help(self):
|
|
"""Test tracker command help."""
|
|
import subprocess
|
|
result = subprocess.run(
|
|
['python3', '-m', 'cortex.tracker', '--help'],
|
|
capture_output=True, text=True
|
|
)
|
|
assert 'scan' in result.stdout.lower() or result.returncode == 0
|
|
|
|
def test_cli_sentinel_help(self):
|
|
"""Test sentinel command help."""
|
|
import subprocess
|
|
result = subprocess.run(
|
|
['python3', '-m', 'cortex.sentinel', '--help'],
|
|
capture_output=True, text=True
|
|
)
|
|
assert 'scan' in result.stdout.lower() or result.returncode == 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
pytest.main([__file__, '-v'])
|