darkplex-core/tests/test_governance_enforcer.py
Claudia c5e5ce9dc0
Some checks failed
Tests / test (push) Failing after 5s
fix: all imports updated to cortex.xxx namespace — 405 tests green
- Fixed bare 'from governance.' imports in source + tests
- Fixed bare 'from intelligence.' imports in tests
- Fixed mock.patch targets to use full cortex.xxx paths
- All 405 tests passing
2026-02-12 08:47:45 +01:00

79 lines
3.1 KiB
Python

"""Tests for governance/enforcer.py — Runtime Enforcer."""
import sys
from pathlib import Path
import yaml
import pytest
sys.path.insert(0, str(Path.home() / "repos" / "darkplex-core"))
from cortex.governance.enforcer import Enforcer, Decision
from cortex.governance.policy import PolicyEngine
from cortex.governance.risk_scorer import RiskScorer
from cortex.governance.evidence import EvidenceCollector, ControlMapping
def _make_enforcer(tmp_path, rules=None):
if rules:
policy_file = tmp_path / "test.yaml"
policy_file.write_text(yaml.dump({
"name": "test", "description": "", "version": "1",
"rules": rules,
}))
engine = PolicyEngine(policies_dir=str(tmp_path))
scorer = RiskScorer()
collector = EvidenceCollector(control_mapping=ControlMapping("/dev/null"))
return Enforcer(policy_engine=engine, risk_scorer=scorer, evidence_collector=collector)
class TestDecision:
def test_approved(self):
from governance.risk_scorer import RiskResult
d = Decision(verdict="approve", reason="ok", risk=RiskResult(0, "low", {}), policy_result={})
assert d.approved
def test_not_approved(self):
from governance.risk_scorer import RiskResult
d = Decision(verdict="deny", reason="no", risk=RiskResult(9, "critical", {}), policy_result={})
assert not d.approved
class TestEnforcer:
def test_default_allow(self, tmp_path):
enforcer = _make_enforcer(tmp_path)
decision = enforcer.evaluate({"agent": "claudia", "action": "read", "hour": 12})
assert decision.verdict == "allow"
def test_policy_deny(self, tmp_path):
enforcer = _make_enforcer(tmp_path, rules=[
{"name": "deny-ext", "conditions": {"target": "external"}, "effect": "deny", "priority": 10},
])
decision = enforcer.evaluate({"agent": "claudia", "action": "send", "target": "external", "hour": 12})
assert decision.verdict == "deny"
def test_risk_override(self, tmp_path):
"""High risk should override an allow policy to escalate."""
enforcer = _make_enforcer(tmp_path, rules=[
{"name": "allow-all", "conditions": {"agent": "claudia"}, "effect": "allow", "priority": 1},
])
decision = enforcer.evaluate({
"agent": "claudia", "action": "export",
"data_type": "restricted", "target": "external", "hour": 12,
})
# Risk should be high/critical, overriding the allow
assert decision.verdict in ("deny", "escalate")
def test_evidence_recorded(self, tmp_path):
enforcer = _make_enforcer(tmp_path)
enforcer.evaluate({"agent": "test", "action": "read", "hour": 12})
assert len(enforcer.evidence_collector.evidence) == 1
def test_data_classification_alias(self, tmp_path):
enforcer = _make_enforcer(tmp_path)
decision = enforcer.evaluate({
"agent": "test", "action": "read",
"data_classification": "confidential", "hour": 12,
})
# Should use data_classification as data_type
assert decision.risk.factors["data_type"]["value"] == "confidential"