Some checks failed
Tests / test (push) Failing after 2s
- Merged all unique darkplex-core modules into cortex: - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop) - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator) - entity_manager.py, knowledge_extractor.py - Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.' - Added 'darkplex' CLI alias alongside 'cortex' - Package renamed to darkplex-core v0.2.0 - 405 tests passing (was 234) - 14 new test files covering all merged modules
72 lines
2.3 KiB
Python
72 lines
2.3 KiB
Python
"""Tests for intelligence/shared_memory module."""
|
|
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
from cortex.intelligence.shared_memory import ALLOWED_AGENTS, Insight, SharedMemory
|
|
|
|
|
|
class TestInsight:
|
|
def test_creation(self):
|
|
i = Insight(agent="claudia", topic="test", content="hello")
|
|
assert i.agent == "claudia"
|
|
assert i.timestamp # auto-set
|
|
|
|
def test_to_json(self):
|
|
i = Insight(agent="claudia", topic="test", content="hello")
|
|
data = json.loads(i.to_json())
|
|
assert data["agent"] == "claudia"
|
|
assert data["topic"] == "test"
|
|
|
|
def test_from_json(self):
|
|
i = Insight(agent="claudia", topic="test", content="hello", confidence=0.9)
|
|
i2 = Insight.from_json(i.to_json())
|
|
assert i2.agent == i.agent
|
|
assert i2.confidence == 0.9
|
|
|
|
def test_default_confidence(self):
|
|
i = Insight(agent="claudia", topic="t", content="c")
|
|
assert i.confidence == 0.8
|
|
|
|
def test_tags(self):
|
|
i = Insight(agent="claudia", topic="t", content="c", tags=["a", "b"])
|
|
assert len(i.tags) == 2
|
|
|
|
|
|
class TestSharedMemory:
|
|
def test_allowed_agent(self):
|
|
agent = list(ALLOWED_AGENTS)[0]
|
|
sm = SharedMemory(agent_name=agent)
|
|
assert sm.agent_name == agent
|
|
|
|
def test_disallowed_agent(self):
|
|
with pytest.raises(ValueError, match="not allowed"):
|
|
SharedMemory(agent_name="hacker_bot")
|
|
|
|
def test_not_connected_publish(self):
|
|
agent = list(ALLOWED_AGENTS)[0]
|
|
sm = SharedMemory(agent_name=agent)
|
|
with pytest.raises(RuntimeError, match="Not connected"):
|
|
import asyncio
|
|
asyncio.get_event_loop().run_until_complete(
|
|
sm.publish(Insight(agent=agent, topic="t", content="c"))
|
|
)
|
|
|
|
def test_not_connected_subscribe(self):
|
|
agent = list(ALLOWED_AGENTS)[0]
|
|
sm = SharedMemory(agent_name=agent)
|
|
with pytest.raises(RuntimeError, match="Not connected"):
|
|
import asyncio
|
|
asyncio.get_event_loop().run_until_complete(
|
|
sm.subscribe("t", lambda x: None)
|
|
)
|
|
|
|
|
|
class TestAllowedAgents:
|
|
def test_default_agents(self):
|
|
assert "claudia" in ALLOWED_AGENTS
|
|
assert len(ALLOWED_AGENTS) >= 1
|