darkplex-core/tests/test_temporal.py
Claudia c5e5ce9dc0
Some checks failed
Tests / test (push) Failing after 5s
fix: all imports updated to cortex.xxx namespace — 405 tests green
- Fixed bare 'from governance.' imports in source + tests
- Fixed bare 'from intelligence.' imports in tests
- Fixed mock.patch targets to use full cortex.xxx paths
- All 405 tests passing
2026-02-12 08:47:45 +01:00

77 lines
2.3 KiB
Python

"""Tests for intelligence/temporal.py — Temporal Context API."""
import sys
from datetime import datetime, timezone
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
sys.path.insert(0, str(Path.home() / "repos" / "darkplex-core" / "intelligence"))
from cortex.intelligence.temporal import TemporalEntry, TemporalQuery, TemporalContext
class TestTemporalEntry:
def test_creation(self):
e = TemporalEntry(
timestamp=datetime(2026, 1, 1, tzinfo=timezone.utc),
source="nats",
topic="ssl-cert",
content="SSL cert expiring",
)
assert e.source == "nats"
assert e.relevance_score == 0.0
def test_metadata(self):
e = TemporalEntry(
timestamp=datetime.now(timezone.utc),
source="chromadb",
topic="test",
content="test",
metadata={"key": "value"},
relevance_score=0.95,
)
assert e.metadata["key"] == "value"
assert e.relevance_score == 0.95
class TestTemporalQuery:
def test_defaults(self):
q = TemporalQuery(topic="test")
assert q.limit == 50
assert "nats" in q.sources
assert "chromadb" in q.sources
def test_custom(self):
q = TemporalQuery(
topic="ssl",
start_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
limit=10,
sources=["nats"],
)
assert q.limit == 10
assert len(q.sources) == 1
class TestTemporalContext:
def test_init_defaults(self):
ctx = TemporalContext()
assert "localhost" in ctx.nats_url
assert "localhost" in ctx.chromadb_url
def test_init_custom(self):
ctx = TemporalContext(nats_url="nats://custom:4222", chromadb_url="http://custom:8000")
assert ctx.nats_url == "nats://custom:4222"
@pytest.mark.asyncio
async def test_query_no_connections(self):
ctx = TemporalContext()
# No connections established, should return empty
result = await ctx.query(TemporalQuery(topic="test"))
assert result == []
@pytest.mark.asyncio
async def test_close_no_connection(self):
ctx = TemporalContext()
await ctx.close() # Should not raise