"""Runtime Enforcer: pre-execution policy check (approve/deny/escalate). The enforcer is the single entry point for all agent action governance. It orchestrates the policy engine, risk scorer, and evidence collector. """ from __future__ import annotations import logging from dataclasses import dataclass from typing import Any from cortex.governance.evidence import EvidenceCollector from cortex.governance.policy import PolicyEngine from cortex.governance.risk_scorer import RiskResult, RiskScorer logger = logging.getLogger(__name__) @dataclass class Decision: """The final governance decision for an agent action.""" verdict: str # "approve", "deny", "escalate" reason: str risk: RiskResult policy_result: dict[str, Any] @property def approved(self) -> bool: return self.verdict == "approve" class Enforcer: """Pre-execution governance enforcer. Evaluates every agent action against policies and risk scoring, records evidence, and returns a decision. Usage: enforcer = Enforcer(policy_engine, risk_scorer, evidence_collector) decision = enforcer.evaluate({"agent": "claudia", "action": "send_email", ...}) if decision.approved: execute_action() """ # Risk levels that override policy to deny/escalate RISK_OVERRIDES: dict[str, str] = { "critical": "deny", "high": "escalate", } def __init__( self, policy_engine: PolicyEngine | None = None, risk_scorer: RiskScorer | None = None, evidence_collector: EvidenceCollector | None = None, ) -> None: self.policy_engine = policy_engine or PolicyEngine() self.risk_scorer = risk_scorer or RiskScorer() self.evidence_collector = evidence_collector or EvidenceCollector() def evaluate(self, context: dict[str, Any]) -> Decision: """Evaluate an agent action and return a governance decision. Args: context: Action context dict with keys like: - agent: agent identifier - action: action name - data_type / data_classification: data sensitivity - target: internal/external - agent_role: role of the requesting agent - hour: time of day (optional) Returns: Decision with verdict, reason, risk score, and policy result. """ # Normalize data_type if "data_classification" in context and "data_type" not in context: context["data_type"] = context["data_classification"] # Step 1: Risk scoring risk = self.risk_scorer.score(context) # Step 2: Policy evaluation policy_result = self.policy_engine.evaluate(context) policy_verdict = policy_result["verdict"] # Step 3: Combine — risk can override policy to be MORE restrictive verdict = policy_verdict reason = policy_result["reason"] risk_override = self.RISK_OVERRIDES.get(risk.level) if risk_override: strictness = {"deny": 0, "escalate": 1, "allow": 2} if strictness.get(risk_override, 2) < strictness.get(verdict, 2): verdict = risk_override reason = f"Risk override ({risk.level}): {reason}" # Step 4: Record evidence agent = context.get("agent", "unknown") action = context.get("action", "unknown") self.evidence_collector.record( event_type="policy_evaluation", agent=agent, action=action, verdict=verdict, risk_score=risk.value, risk_level=risk.level, details={ "context": context, "policy_result": policy_result, "risk_factors": risk.factors, }, ) decision = Decision( verdict=verdict, reason=reason, risk=risk, policy_result=policy_result, ) logger.info( "Enforcer decision: %s → %s (risk: %d/%s)", f"{agent}/{action}", verdict, risk.value, risk.level, ) return decision