darkplex-core/tests/test_shared_memory.py
Claudia fd7d75c0ed
Some checks failed
Tests / test (push) Failing after 2s
Merge darkplex-core into cortex — unified intelligence layer v0.2.0
- Merged all unique darkplex-core modules into cortex:
  - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop)
  - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator)
  - entity_manager.py, knowledge_extractor.py
- Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.'
- Added 'darkplex' CLI alias alongside 'cortex'
- Package renamed to darkplex-core v0.2.0
- 405 tests passing (was 234)
- 14 new test files covering all merged modules
2026-02-12 08:43:02 +01:00

72 lines
2.3 KiB
Python

"""Tests for intelligence/shared_memory module."""
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from cortex.intelligence.shared_memory import ALLOWED_AGENTS, Insight, SharedMemory
class TestInsight:
def test_creation(self):
i = Insight(agent="claudia", topic="test", content="hello")
assert i.agent == "claudia"
assert i.timestamp # auto-set
def test_to_json(self):
i = Insight(agent="claudia", topic="test", content="hello")
data = json.loads(i.to_json())
assert data["agent"] == "claudia"
assert data["topic"] == "test"
def test_from_json(self):
i = Insight(agent="claudia", topic="test", content="hello", confidence=0.9)
i2 = Insight.from_json(i.to_json())
assert i2.agent == i.agent
assert i2.confidence == 0.9
def test_default_confidence(self):
i = Insight(agent="claudia", topic="t", content="c")
assert i.confidence == 0.8
def test_tags(self):
i = Insight(agent="claudia", topic="t", content="c", tags=["a", "b"])
assert len(i.tags) == 2
class TestSharedMemory:
def test_allowed_agent(self):
agent = list(ALLOWED_AGENTS)[0]
sm = SharedMemory(agent_name=agent)
assert sm.agent_name == agent
def test_disallowed_agent(self):
with pytest.raises(ValueError, match="not allowed"):
SharedMemory(agent_name="hacker_bot")
def test_not_connected_publish(self):
agent = list(ALLOWED_AGENTS)[0]
sm = SharedMemory(agent_name=agent)
with pytest.raises(RuntimeError, match="Not connected"):
import asyncio
asyncio.get_event_loop().run_until_complete(
sm.publish(Insight(agent=agent, topic="t", content="c"))
)
def test_not_connected_subscribe(self):
agent = list(ALLOWED_AGENTS)[0]
sm = SharedMemory(agent_name=agent)
with pytest.raises(RuntimeError, match="Not connected"):
import asyncio
asyncio.get_event_loop().run_until_complete(
sm.subscribe("t", lambda x: None)
)
class TestAllowedAgents:
def test_default_agents(self):
assert "claudia" in ALLOWED_AGENTS
assert len(ALLOWED_AGENTS) >= 1