"""Tests for intelligence/shared_memory module.""" import json import sys from pathlib import Path import pytest sys.path.insert(0, str(Path(__file__).parent.parent)) from cortex.intelligence.shared_memory import ALLOWED_AGENTS, Insight, SharedMemory class TestInsight: def test_creation(self): i = Insight(agent="claudia", topic="test", content="hello") assert i.agent == "claudia" assert i.timestamp # auto-set def test_to_json(self): i = Insight(agent="claudia", topic="test", content="hello") data = json.loads(i.to_json()) assert data["agent"] == "claudia" assert data["topic"] == "test" def test_from_json(self): i = Insight(agent="claudia", topic="test", content="hello", confidence=0.9) i2 = Insight.from_json(i.to_json()) assert i2.agent == i.agent assert i2.confidence == 0.9 def test_default_confidence(self): i = Insight(agent="claudia", topic="t", content="c") assert i.confidence == 0.8 def test_tags(self): i = Insight(agent="claudia", topic="t", content="c", tags=["a", "b"]) assert len(i.tags) == 2 class TestSharedMemory: def test_allowed_agent(self): agent = list(ALLOWED_AGENTS)[0] sm = SharedMemory(agent_name=agent) assert sm.agent_name == agent def test_disallowed_agent(self): with pytest.raises(ValueError, match="not allowed"): SharedMemory(agent_name="hacker_bot") def test_not_connected_publish(self): agent = list(ALLOWED_AGENTS)[0] sm = SharedMemory(agent_name=agent) with pytest.raises(RuntimeError, match="Not connected"): import asyncio asyncio.get_event_loop().run_until_complete( sm.publish(Insight(agent=agent, topic="t", content="c")) ) def test_not_connected_subscribe(self): agent = list(ALLOWED_AGENTS)[0] sm = SharedMemory(agent_name=agent) with pytest.raises(RuntimeError, match="Not connected"): import asyncio asyncio.get_event_loop().run_until_complete( sm.subscribe("t", lambda x: None) ) class TestAllowedAgents: def test_default_agents(self): assert "claudia" in ALLOWED_AGENTS assert len(ALLOWED_AGENTS) >= 1