darkplex-core/tests/test_anticipator.py
Claudia fd7d75c0ed
Some checks failed
Tests / test (push) Failing after 2s
Merge darkplex-core into cortex — unified intelligence layer v0.2.0
- Merged all unique darkplex-core modules into cortex:
  - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop)
  - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator)
  - entity_manager.py, knowledge_extractor.py
- Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.'
- Added 'darkplex' CLI alias alongside 'cortex'
- Package renamed to darkplex-core v0.2.0
- 405 tests passing (was 234)
- 14 new test files covering all merged modules
2026-02-12 08:43:02 +01:00

106 lines
3.5 KiB
Python

"""Tests for intelligence/anticipator module."""
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from cortex.intelligence.anticipator import (
AlertSeverity,
Anticipator,
PatternDefinition,
Prediction,
_detect_recurring_errors,
_detect_ssl_expiry,
_detect_usage_spike,
)
class TestAnticipatorInit:
def test_creates_with_builtin_patterns(self):
a = Anticipator()
assert len(a.patterns) == 3
def test_register_custom_pattern(self):
a = Anticipator()
p = PatternDefinition(name="test", description="test", detector=lambda e: None)
a.register_pattern(p)
assert len(a.patterns) == 4
class TestAnalyze:
def test_empty_events(self):
a = Anticipator()
result = a.analyze([])
assert result == []
def test_no_matching_patterns(self):
a = Anticipator()
result = a.analyze([{"type": "unrelated", "data": {}}])
assert result == []
def test_detector_exception_handled(self):
def bad_detector(events):
raise RuntimeError("boom")
a = Anticipator()
a.patterns = [PatternDefinition(name="bad", description="", detector=bad_detector)]
result = a.analyze([{}])
assert result == []
class TestSSLExpiry:
def test_no_ssl_events(self):
assert _detect_ssl_expiry([{"type": "other"}]) is None
def test_expiring_soon(self):
expiry = (datetime.now(timezone.utc) + timedelta(days=5)).isoformat()
events = [{"type": "ssl_cert_check", "data": {"expiry": expiry, "domain": "example.com"}}]
result = _detect_ssl_expiry(events)
assert result is not None
assert result.severity == AlertSeverity.WARNING
def test_expiring_critical(self):
expiry = (datetime.now(timezone.utc) + timedelta(days=1)).isoformat()
events = [{"type": "ssl_cert_check", "data": {"expiry": expiry, "domain": "example.com"}}]
result = _detect_ssl_expiry(events)
assert result.severity == AlertSeverity.CRITICAL
def test_not_expiring(self):
expiry = (datetime.now(timezone.utc) + timedelta(days=60)).isoformat()
events = [{"type": "ssl_cert_check", "data": {"expiry": expiry, "domain": "example.com"}}]
assert _detect_ssl_expiry(events) is None
class TestRecurringErrors:
def test_no_errors(self):
assert _detect_recurring_errors([]) is None
def test_few_errors(self):
events = [{"type": "error", "data": {"error_type": "timeout"}}] * 2
assert _detect_recurring_errors(events) is None
def test_recurring_detected(self):
events = [{"type": "error", "data": {"error_type": "timeout"}}] * 5
result = _detect_recurring_errors(events)
assert result is not None
assert result.metadata["count"] == 5
class TestUsageSpike:
def test_insufficient_data(self):
assert _detect_usage_spike([]) is None
def test_normal_usage(self):
events = [{"type": "usage_metric", "data": {"value": 10}} for _ in range(15)]
assert _detect_usage_spike(events) is None
def test_spike_detected(self):
events = [{"type": "usage_metric", "data": {"value": 10}} for _ in range(12)]
events[-1]["data"]["value"] = 100
events[-2]["data"]["value"] = 100
events[-3]["data"]["value"] = 100
result = _detect_usage_spike(events)
assert result is not None