"""Policy Engine: loads YAML policies and evaluates agent actions against them. Policies are human-readable YAML files, versioned in Git. Each policy defines rules with conditions and effects (allow/deny/escalate). """ from __future__ import annotations import logging import os from dataclasses import dataclass, field from pathlib import Path from typing import Any import yaml logger = logging.getLogger(__name__) @dataclass class Rule: """A single policy rule with conditions and an effect.""" name: str conditions: dict[str, Any] effect: str # "allow", "deny", "escalate" priority: int = 0 def matches(self, context: dict[str, Any]) -> bool: """Check if all conditions match the given context.""" for key, expected in self.conditions.items(): actual = context.get(key) if actual is None: return False if isinstance(expected, list): if actual not in expected: return False elif actual != expected: return False return True @dataclass class Policy: """A named policy containing ordered rules.""" name: str description: str version: str rules: list[Rule] = field(default_factory=list) class PolicyEngine: """Loads and evaluates YAML-based governance policies. Usage: engine = PolicyEngine(policies_dir="policies/") result = engine.evaluate(action_context) """ def __init__(self, policies_dir: str | None = None) -> None: self.policies_dir = Path( policies_dir or os.environ.get("GOVERNANCE_POLICIES_DIR", "policies/") ) self.policies: list[Policy] = [] self._load_policies() def _load_policies(self) -> None: """Load all YAML policy files from the policies directory.""" if not self.policies_dir.exists(): logger.warning("Policies directory not found: %s", self.policies_dir) return for path in sorted(self.policies_dir.glob("*.yaml")): if path.name == "schema.yaml": continue try: policy = self._parse_policy(path) self.policies.append(policy) logger.info("Loaded policy: %s (%d rules)", policy.name, len(policy.rules)) except Exception: logger.exception("Failed to load policy: %s", path) def _parse_policy(self, path: Path) -> Policy: """Parse a YAML file into a Policy object.""" with open(path, "r") as f: data = yaml.safe_load(f) rules = [] for rule_data in data.get("rules", []): rules.append( Rule( name=rule_data["name"], conditions=rule_data.get("conditions", {}), effect=rule_data.get("effect", "deny"), priority=rule_data.get("priority", 0), ) ) return Policy( name=data.get("name", path.stem), description=data.get("description", ""), version=data.get("version", "1.0.0"), rules=rules, ) def evaluate(self, context: dict[str, Any]) -> dict[str, Any]: """Evaluate an action context against all loaded policies. Returns the highest-priority matching rule's effect, or 'allow' if no rules match. """ matches: list[tuple[Rule, Policy]] = [] for policy in self.policies: for rule in policy.rules: if rule.matches(context): matches.append((rule, policy)) if not matches: return { "verdict": "allow", "reason": "No matching policy rules", "matched_rules": [], } # Sort by priority (highest first), then by strictness (deny > escalate > allow) effect_order = {"deny": 0, "escalate": 1, "allow": 2} matches.sort(key=lambda m: (-m[0].priority, effect_order.get(m[0].effect, 2))) top_rule, top_policy = matches[0] return { "verdict": top_rule.effect, "reason": f"Policy '{top_policy.name}', rule '{top_rule.name}'", "matched_rules": [ {"policy": p.name, "rule": r.name, "effect": r.effect} for r, p in matches ], } def reload(self) -> None: """Reload all policies from disk.""" self.policies.clear() self._load_policies()