darkplex-core/cortex/governance/evidence.py
Claudia fd7d75c0ed
Some checks failed
Tests / test (push) Failing after 2s
Merge darkplex-core into cortex — unified intelligence layer v0.2.0
- Merged all unique darkplex-core modules into cortex:
  - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop)
  - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator)
  - entity_manager.py, knowledge_extractor.py
- Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.'
- Added 'darkplex' CLI alias alongside 'cortex'
- Package renamed to darkplex-core v0.2.0
- 405 tests passing (was 234)
- 14 new test files covering all merged modules
2026-02-12 08:43:02 +01:00

153 lines
4.9 KiB
Python

"""Evidence Collector: NATS JetStream events → ISO 27001 control mapping.
Collects governance events from NATS, maps them to ISO 27001 Annex A controls,
and stores evidence for audit reporting.
"""
from __future__ import annotations
import json
import logging
import os
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
import yaml
logger = logging.getLogger(__name__)
@dataclass
class EvidenceRecord:
"""A single piece of compliance evidence."""
timestamp: str
event_type: str
agent: str
action: str
verdict: str
risk_score: int
risk_level: str
controls: list[str] # ISO 27001 control IDs
details: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"timestamp": self.timestamp,
"event_type": self.event_type,
"agent": self.agent,
"action": self.action,
"verdict": self.verdict,
"risk_score": self.risk_score,
"risk_level": self.risk_level,
"controls": self.controls,
"details": self.details,
}
class ControlMapping:
"""Maps event types to ISO 27001 Annex A controls."""
def __init__(self, mapping_path: str | None = None) -> None:
self.mapping: dict[str, list[str]] = {}
path = mapping_path or os.environ.get(
"GOVERNANCE_CONTROLS_MAPPING", "controls/iso27001-mapping.yaml"
)
self._load_mapping(path)
def _load_mapping(self, path: str) -> None:
"""Load the control mapping from YAML."""
try:
with open(path, "r") as f:
data = yaml.safe_load(f)
for mapping in data.get("mappings", []):
for event_type in mapping.get("event_types", []):
self.mapping.setdefault(event_type, []).extend(mapping.get("controls", []))
logger.info("Loaded %d event type mappings", len(self.mapping))
except FileNotFoundError:
logger.warning("Control mapping not found: %s", path)
except Exception:
logger.exception("Failed to load control mapping: %s", path)
def get_controls(self, event_type: str) -> list[str]:
"""Return ISO 27001 controls applicable to an event type."""
return self.mapping.get(event_type, [])
class EvidenceCollector:
"""Collects and stores governance evidence from agent actions.
In production, this subscribes to NATS JetStream. For testing,
evidence can be recorded directly via record().
Usage:
collector = EvidenceCollector()
collector.record(event_type="policy_evaluation", agent="claudia", ...)
"""
def __init__(self, control_mapping: ControlMapping | None = None) -> None:
self.control_mapping = control_mapping or ControlMapping()
self.evidence: list[EvidenceRecord] = []
def record(
self,
event_type: str,
agent: str,
action: str,
verdict: str,
risk_score: int = 0,
risk_level: str = "low",
details: dict[str, Any] | None = None,
) -> EvidenceRecord:
"""Record a governance evidence entry.
Args:
event_type: Type of governance event (e.g., policy_evaluation, access_request)
agent: Agent identifier
action: Action being performed
verdict: Policy verdict (allow/deny/escalate)
risk_score: Numeric risk score (0-10)
risk_level: Risk level string
details: Additional context
"""
controls = self.control_mapping.get_controls(event_type)
record = EvidenceRecord(
timestamp=datetime.now(timezone.utc).isoformat(),
event_type=event_type,
agent=agent,
action=action,
verdict=verdict,
risk_score=risk_score,
risk_level=risk_level,
controls=controls,
details=details or {},
)
self.evidence.append(record)
logger.info(
"Evidence recorded: %s by %s%s (risk: %d/%s, controls: %s)",
action, agent, verdict, risk_score, risk_level, controls,
)
return record
def get_evidence(
self,
agent: str | None = None,
control: str | None = None,
verdict: str | None = None,
) -> list[EvidenceRecord]:
"""Query evidence with optional filters."""
results = self.evidence
if agent:
results = [e for e in results if e.agent == agent]
if control:
results = [e for e in results if control in e.controls]
if verdict:
results = [e for e in results if e.verdict == verdict]
return results
def export_json(self) -> str:
"""Export all evidence as JSON."""
return json.dumps([e.to_dict() for e in self.evidence], indent=2)