"""Tests for intelligence/anticipator module.""" import sys from datetime import datetime, timedelta, timezone from pathlib import Path import pytest sys.path.insert(0, str(Path(__file__).parent.parent)) from cortex.intelligence.anticipator import ( AlertSeverity, Anticipator, PatternDefinition, Prediction, _detect_recurring_errors, _detect_ssl_expiry, _detect_usage_spike, ) class TestAnticipatorInit: def test_creates_with_builtin_patterns(self): a = Anticipator() assert len(a.patterns) == 3 def test_register_custom_pattern(self): a = Anticipator() p = PatternDefinition(name="test", description="test", detector=lambda e: None) a.register_pattern(p) assert len(a.patterns) == 4 class TestAnalyze: def test_empty_events(self): a = Anticipator() result = a.analyze([]) assert result == [] def test_no_matching_patterns(self): a = Anticipator() result = a.analyze([{"type": "unrelated", "data": {}}]) assert result == [] def test_detector_exception_handled(self): def bad_detector(events): raise RuntimeError("boom") a = Anticipator() a.patterns = [PatternDefinition(name="bad", description="", detector=bad_detector)] result = a.analyze([{}]) assert result == [] class TestSSLExpiry: def test_no_ssl_events(self): assert _detect_ssl_expiry([{"type": "other"}]) is None def test_expiring_soon(self): expiry = (datetime.now(timezone.utc) + timedelta(days=5)).isoformat() events = [{"type": "ssl_cert_check", "data": {"expiry": expiry, "domain": "example.com"}}] result = _detect_ssl_expiry(events) assert result is not None assert result.severity == AlertSeverity.WARNING def test_expiring_critical(self): expiry = (datetime.now(timezone.utc) + timedelta(days=1)).isoformat() events = [{"type": "ssl_cert_check", "data": {"expiry": expiry, "domain": "example.com"}}] result = _detect_ssl_expiry(events) assert result.severity == AlertSeverity.CRITICAL def test_not_expiring(self): expiry = (datetime.now(timezone.utc) + timedelta(days=60)).isoformat() events = [{"type": "ssl_cert_check", "data": {"expiry": expiry, "domain": "example.com"}}] assert _detect_ssl_expiry(events) is None class TestRecurringErrors: def test_no_errors(self): assert _detect_recurring_errors([]) is None def test_few_errors(self): events = [{"type": "error", "data": {"error_type": "timeout"}}] * 2 assert _detect_recurring_errors(events) is None def test_recurring_detected(self): events = [{"type": "error", "data": {"error_type": "timeout"}}] * 5 result = _detect_recurring_errors(events) assert result is not None assert result.metadata["count"] == 5 class TestUsageSpike: def test_insufficient_data(self): assert _detect_usage_spike([]) is None def test_normal_usage(self): events = [{"type": "usage_metric", "data": {"value": 10}} for _ in range(15)] assert _detect_usage_spike(events) is None def test_spike_detected(self): events = [{"type": "usage_metric", "data": {"value": 10}} for _ in range(12)] events[-1]["data"]["value"] = 100 events[-2]["data"]["value"] = 100 events[-3]["data"]["value"] = 100 result = _detect_usage_spike(events) assert result is not None