Some checks failed
Tests / test (push) Failing after 2s
- Merged all unique darkplex-core modules into cortex: - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop) - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator) - entity_manager.py, knowledge_extractor.py - Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.' - Added 'darkplex' CLI alias alongside 'cortex' - Package renamed to darkplex-core v0.2.0 - 405 tests passing (was 234) - 14 new test files covering all merged modules
143 lines
4.5 KiB
Python
143 lines
4.5 KiB
Python
"""Policy Engine: loads YAML policies and evaluates agent actions against them.
|
|
|
|
Policies are human-readable YAML files, versioned in Git. Each policy defines
|
|
rules with conditions and effects (allow/deny/escalate).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Rule:
|
|
"""A single policy rule with conditions and an effect."""
|
|
|
|
name: str
|
|
conditions: dict[str, Any]
|
|
effect: str # "allow", "deny", "escalate"
|
|
priority: int = 0
|
|
|
|
def matches(self, context: dict[str, Any]) -> bool:
|
|
"""Check if all conditions match the given context."""
|
|
for key, expected in self.conditions.items():
|
|
actual = context.get(key)
|
|
if actual is None:
|
|
return False
|
|
if isinstance(expected, list):
|
|
if actual not in expected:
|
|
return False
|
|
elif actual != expected:
|
|
return False
|
|
return True
|
|
|
|
|
|
@dataclass
|
|
class Policy:
|
|
"""A named policy containing ordered rules."""
|
|
|
|
name: str
|
|
description: str
|
|
version: str
|
|
rules: list[Rule] = field(default_factory=list)
|
|
|
|
|
|
class PolicyEngine:
|
|
"""Loads and evaluates YAML-based governance policies.
|
|
|
|
Usage:
|
|
engine = PolicyEngine(policies_dir="policies/")
|
|
result = engine.evaluate(action_context)
|
|
"""
|
|
|
|
def __init__(self, policies_dir: str | None = None) -> None:
|
|
self.policies_dir = Path(
|
|
policies_dir or os.environ.get("GOVERNANCE_POLICIES_DIR", "policies/")
|
|
)
|
|
self.policies: list[Policy] = []
|
|
self._load_policies()
|
|
|
|
def _load_policies(self) -> None:
|
|
"""Load all YAML policy files from the policies directory."""
|
|
if not self.policies_dir.exists():
|
|
logger.warning("Policies directory not found: %s", self.policies_dir)
|
|
return
|
|
|
|
for path in sorted(self.policies_dir.glob("*.yaml")):
|
|
if path.name == "schema.yaml":
|
|
continue
|
|
try:
|
|
policy = self._parse_policy(path)
|
|
self.policies.append(policy)
|
|
logger.info("Loaded policy: %s (%d rules)", policy.name, len(policy.rules))
|
|
except Exception:
|
|
logger.exception("Failed to load policy: %s", path)
|
|
|
|
def _parse_policy(self, path: Path) -> Policy:
|
|
"""Parse a YAML file into a Policy object."""
|
|
with open(path, "r") as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
rules = []
|
|
for rule_data in data.get("rules", []):
|
|
rules.append(
|
|
Rule(
|
|
name=rule_data["name"],
|
|
conditions=rule_data.get("conditions", {}),
|
|
effect=rule_data.get("effect", "deny"),
|
|
priority=rule_data.get("priority", 0),
|
|
)
|
|
)
|
|
|
|
return Policy(
|
|
name=data.get("name", path.stem),
|
|
description=data.get("description", ""),
|
|
version=data.get("version", "1.0.0"),
|
|
rules=rules,
|
|
)
|
|
|
|
def evaluate(self, context: dict[str, Any]) -> dict[str, Any]:
|
|
"""Evaluate an action context against all loaded policies.
|
|
|
|
Returns the highest-priority matching rule's effect, or 'allow' if no rules match.
|
|
"""
|
|
matches: list[tuple[Rule, Policy]] = []
|
|
|
|
for policy in self.policies:
|
|
for rule in policy.rules:
|
|
if rule.matches(context):
|
|
matches.append((rule, policy))
|
|
|
|
if not matches:
|
|
return {
|
|
"verdict": "allow",
|
|
"reason": "No matching policy rules",
|
|
"matched_rules": [],
|
|
}
|
|
|
|
# Sort by priority (highest first), then by strictness (deny > escalate > allow)
|
|
effect_order = {"deny": 0, "escalate": 1, "allow": 2}
|
|
matches.sort(key=lambda m: (-m[0].priority, effect_order.get(m[0].effect, 2)))
|
|
|
|
top_rule, top_policy = matches[0]
|
|
return {
|
|
"verdict": top_rule.effect,
|
|
"reason": f"Policy '{top_policy.name}', rule '{top_rule.name}'",
|
|
"matched_rules": [
|
|
{"policy": p.name, "rule": r.name, "effect": r.effect}
|
|
for r, p in matches
|
|
],
|
|
}
|
|
|
|
def reload(self) -> None:
|
|
"""Reload all policies from disk."""
|
|
self.policies.clear()
|
|
self._load_policies()
|