"""Cross-Agent Memory Bus: NATS pub/sub for agent insights. Agents publish insights (observations, learned facts, warnings) to the bus. Other agents subscribe to topics relevant to their function. ⚠️ DATA ISOLATION: Only Vainplex-internal agents participate. """ from __future__ import annotations import json import logging import os from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Callable, Awaitable logger = logging.getLogger(__name__) NATS_URL = os.environ.get("NATS_URL", "nats://localhost:4222") # Only these agents are allowed to participate in shared memory ALLOWED_AGENTS: set[str] = set( os.environ.get("INTELLIGENCE_ALLOWED_AGENTS", "claudia,vera,stella,viola").split(",") ) INSIGHT_SUBJECT_PREFIX = "darkplex.intelligence.insights" @dataclass class Insight: """An agent insight to be shared across the memory bus.""" agent: str topic: str content: str confidence: float = 0.8 # 0.0-1.0 tags: list[str] = field(default_factory=list) timestamp: str = "" metadata: dict[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: if not self.timestamp: self.timestamp = datetime.now(timezone.utc).isoformat() def to_json(self) -> str: return json.dumps({ "agent": self.agent, "topic": self.topic, "content": self.content, "confidence": self.confidence, "tags": self.tags, "timestamp": self.timestamp, "metadata": self.metadata, }) @classmethod def from_json(cls, data: str) -> Insight: d = json.loads(data) return cls(**d) InsightHandler = Callable[[Insight], Awaitable[None]] class SharedMemory: """Cross-agent memory bus using NATS pub/sub. Usage: memory = SharedMemory(agent_name="claudia") await memory.connect() await memory.publish(Insight(agent="claudia", topic="infra", content="...")) await memory.subscribe("infra", handler) ⚠️ Enforces data isolation: only allowed agents can publish/subscribe. """ def __init__(self, agent_name: str, nats_url: str | None = None) -> None: if agent_name not in ALLOWED_AGENTS: raise ValueError( f"Agent '{agent_name}' is not allowed in shared memory. " f"Allowed: {ALLOWED_AGENTS}" ) self.agent_name = agent_name self.nats_url = nats_url or NATS_URL self._nats_client: Any = None self._subscriptions: list[Any] = [] async def connect(self) -> None: """Connect to the NATS server.""" try: import nats self._nats_client = await nats.connect(self.nats_url) logger.info("SharedMemory connected for agent '%s'", self.agent_name) except Exception: logger.exception("Failed to connect SharedMemory to NATS") raise async def publish(self, insight: Insight) -> None: """Publish an insight to the memory bus. Args: insight: The insight to share. Agent field must match this instance's agent. """ if not self._nats_client: raise RuntimeError("Not connected. Call connect() first.") if insight.agent not in ALLOWED_AGENTS: raise ValueError(f"Agent '{insight.agent}' not allowed to publish insights") subject = f"{INSIGHT_SUBJECT_PREFIX}.{insight.topic}" await self._nats_client.publish(subject, insight.to_json().encode()) logger.debug( "Published insight: %s/%s by %s", insight.topic, insight.content[:50], insight.agent ) async def subscribe(self, topic: str, handler: InsightHandler) -> None: """Subscribe to insights on a topic. Args: topic: Topic to subscribe to (supports NATS wildcards). handler: Async callback for received insights. """ if not self._nats_client: raise RuntimeError("Not connected. Call connect() first.") subject = f"{INSIGHT_SUBJECT_PREFIX}.{topic}" async def _message_handler(msg: Any) -> None: try: insight = Insight.from_json(msg.data.decode()) if insight.agent not in ALLOWED_AGENTS: logger.warning( "Ignoring insight from non-allowed agent: %s", insight.agent ) return await handler(insight) except Exception: logger.exception("Error handling insight message") sub = await self._nats_client.subscribe(subject, cb=_message_handler) self._subscriptions.append(sub) logger.info("Subscribed to insights: %s", subject) async def close(self) -> None: """Unsubscribe and disconnect.""" for sub in self._subscriptions: await sub.unsubscribe() self._subscriptions.clear() if self._nats_client: await self._nats_client.close() self._nats_client = None