Some checks failed
Tests / test (push) Failing after 5s
- Fixed bare 'from governance.' imports in source + tests - Fixed bare 'from intelligence.' imports in tests - Fixed mock.patch targets to use full cortex.xxx paths - All 405 tests passing
79 lines
3.1 KiB
Python
79 lines
3.1 KiB
Python
"""Tests for governance/enforcer.py — Runtime Enforcer."""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
import pytest
|
|
|
|
sys.path.insert(0, str(Path.home() / "repos" / "darkplex-core"))
|
|
|
|
from cortex.governance.enforcer import Enforcer, Decision
|
|
from cortex.governance.policy import PolicyEngine
|
|
from cortex.governance.risk_scorer import RiskScorer
|
|
from cortex.governance.evidence import EvidenceCollector, ControlMapping
|
|
|
|
|
|
def _make_enforcer(tmp_path, rules=None):
|
|
if rules:
|
|
policy_file = tmp_path / "test.yaml"
|
|
policy_file.write_text(yaml.dump({
|
|
"name": "test", "description": "", "version": "1",
|
|
"rules": rules,
|
|
}))
|
|
engine = PolicyEngine(policies_dir=str(tmp_path))
|
|
scorer = RiskScorer()
|
|
collector = EvidenceCollector(control_mapping=ControlMapping("/dev/null"))
|
|
return Enforcer(policy_engine=engine, risk_scorer=scorer, evidence_collector=collector)
|
|
|
|
|
|
class TestDecision:
|
|
def test_approved(self):
|
|
from governance.risk_scorer import RiskResult
|
|
d = Decision(verdict="approve", reason="ok", risk=RiskResult(0, "low", {}), policy_result={})
|
|
assert d.approved
|
|
|
|
def test_not_approved(self):
|
|
from governance.risk_scorer import RiskResult
|
|
d = Decision(verdict="deny", reason="no", risk=RiskResult(9, "critical", {}), policy_result={})
|
|
assert not d.approved
|
|
|
|
|
|
class TestEnforcer:
|
|
def test_default_allow(self, tmp_path):
|
|
enforcer = _make_enforcer(tmp_path)
|
|
decision = enforcer.evaluate({"agent": "claudia", "action": "read", "hour": 12})
|
|
assert decision.verdict == "allow"
|
|
|
|
def test_policy_deny(self, tmp_path):
|
|
enforcer = _make_enforcer(tmp_path, rules=[
|
|
{"name": "deny-ext", "conditions": {"target": "external"}, "effect": "deny", "priority": 10},
|
|
])
|
|
decision = enforcer.evaluate({"agent": "claudia", "action": "send", "target": "external", "hour": 12})
|
|
assert decision.verdict == "deny"
|
|
|
|
def test_risk_override(self, tmp_path):
|
|
"""High risk should override an allow policy to escalate."""
|
|
enforcer = _make_enforcer(tmp_path, rules=[
|
|
{"name": "allow-all", "conditions": {"agent": "claudia"}, "effect": "allow", "priority": 1},
|
|
])
|
|
decision = enforcer.evaluate({
|
|
"agent": "claudia", "action": "export",
|
|
"data_type": "restricted", "target": "external", "hour": 12,
|
|
})
|
|
# Risk should be high/critical, overriding the allow
|
|
assert decision.verdict in ("deny", "escalate")
|
|
|
|
def test_evidence_recorded(self, tmp_path):
|
|
enforcer = _make_enforcer(tmp_path)
|
|
enforcer.evaluate({"agent": "test", "action": "read", "hour": 12})
|
|
assert len(enforcer.evidence_collector.evidence) == 1
|
|
|
|
def test_data_classification_alias(self, tmp_path):
|
|
enforcer = _make_enforcer(tmp_path)
|
|
decision = enforcer.evaluate({
|
|
"agent": "test", "action": "read",
|
|
"data_classification": "confidential", "hour": 12,
|
|
})
|
|
# Should use data_classification as data_type
|
|
assert decision.risk.factors["data_type"]["value"] == "confidential"
|