darkplex-core/cortex/governance/enforcer.py
Claudia c5e5ce9dc0
Some checks failed
Tests / test (push) Failing after 5s
fix: all imports updated to cortex.xxx namespace — 405 tests green
- Fixed bare 'from governance.' imports in source + tests
- Fixed bare 'from intelligence.' imports in tests
- Fixed mock.patch targets to use full cortex.xxx paths
- All 405 tests passing
2026-02-12 08:47:45 +01:00

129 lines
4.1 KiB
Python

"""Runtime Enforcer: pre-execution policy check (approve/deny/escalate).
The enforcer is the single entry point for all agent action governance.
It orchestrates the policy engine, risk scorer, and evidence collector.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
from cortex.governance.evidence import EvidenceCollector
from cortex.governance.policy import PolicyEngine
from cortex.governance.risk_scorer import RiskResult, RiskScorer
logger = logging.getLogger(__name__)
@dataclass
class Decision:
"""The final governance decision for an agent action."""
verdict: str # "approve", "deny", "escalate"
reason: str
risk: RiskResult
policy_result: dict[str, Any]
@property
def approved(self) -> bool:
return self.verdict == "approve"
class Enforcer:
"""Pre-execution governance enforcer.
Evaluates every agent action against policies and risk scoring,
records evidence, and returns a decision.
Usage:
enforcer = Enforcer(policy_engine, risk_scorer, evidence_collector)
decision = enforcer.evaluate({"agent": "claudia", "action": "send_email", ...})
if decision.approved:
execute_action()
"""
# Risk levels that override policy to deny/escalate
RISK_OVERRIDES: dict[str, str] = {
"critical": "deny",
"high": "escalate",
}
def __init__(
self,
policy_engine: PolicyEngine | None = None,
risk_scorer: RiskScorer | None = None,
evidence_collector: EvidenceCollector | None = None,
) -> None:
self.policy_engine = policy_engine or PolicyEngine()
self.risk_scorer = risk_scorer or RiskScorer()
self.evidence_collector = evidence_collector or EvidenceCollector()
def evaluate(self, context: dict[str, Any]) -> Decision:
"""Evaluate an agent action and return a governance decision.
Args:
context: Action context dict with keys like:
- agent: agent identifier
- action: action name
- data_type / data_classification: data sensitivity
- target: internal/external
- agent_role: role of the requesting agent
- hour: time of day (optional)
Returns:
Decision with verdict, reason, risk score, and policy result.
"""
# Normalize data_type
if "data_classification" in context and "data_type" not in context:
context["data_type"] = context["data_classification"]
# Step 1: Risk scoring
risk = self.risk_scorer.score(context)
# Step 2: Policy evaluation
policy_result = self.policy_engine.evaluate(context)
policy_verdict = policy_result["verdict"]
# Step 3: Combine — risk can override policy to be MORE restrictive
verdict = policy_verdict
reason = policy_result["reason"]
risk_override = self.RISK_OVERRIDES.get(risk.level)
if risk_override:
strictness = {"deny": 0, "escalate": 1, "allow": 2}
if strictness.get(risk_override, 2) < strictness.get(verdict, 2):
verdict = risk_override
reason = f"Risk override ({risk.level}): {reason}"
# Step 4: Record evidence
agent = context.get("agent", "unknown")
action = context.get("action", "unknown")
self.evidence_collector.record(
event_type="policy_evaluation",
agent=agent,
action=action,
verdict=verdict,
risk_score=risk.value,
risk_level=risk.level,
details={
"context": context,
"policy_result": policy_result,
"risk_factors": risk.factors,
},
)
decision = Decision(
verdict=verdict,
reason=reason,
risk=risk,
policy_result=policy_result,
)
logger.info(
"Enforcer decision: %s%s (risk: %d/%s)",
f"{agent}/{action}", verdict, risk.value, risk.level,
)
return decision