"""Governance CLI — policy evaluation, risk scoring, evidence & reporting. Usage: darkplex governance evaluate --agent --action [--data-type ] [--target ] [--role ] darkplex governance risk --agent --action [--data-type ] [--target ] [--role ] darkplex governance evidence [--agent ] [--verdict ] [--control ] [--json] darkplex governance report [--agent ] [--json] [--output ] darkplex governance policies [--reload] darkplex governance status """ from __future__ import annotations import argparse import json import os import sys from pathlib import Path # Default paths DEFAULT_POLICIES_DIR = os.environ.get( "GOVERNANCE_POLICIES_DIR", str(Path(__file__).parent / "policies"), ) DEFAULT_CONTROLS_MAPPING = os.environ.get( "GOVERNANCE_CONTROLS_MAPPING", str(Path(__file__).parent / "controls" / "iso27001-mapping.yaml"), ) def _build_context(args: argparse.Namespace) -> dict: """Build an evaluation context from CLI args.""" ctx = {} if args.agent: ctx["agent"] = args.agent if args.action: ctx["action"] = args.action if args.data_type: ctx["data_type"] = args.data_type if args.target: ctx["target"] = args.target if args.role: ctx["agent_role"] = args.role return ctx def _get_engine(): from cortex.governance.policy import PolicyEngine return PolicyEngine(policies_dir=DEFAULT_POLICIES_DIR) def _get_scorer(): from cortex.governance.risk_scorer import RiskScorer return RiskScorer() def _get_enforcer(): from cortex.governance.enforcer import Enforcer from cortex.governance.policy import PolicyEngine from cortex.governance.risk_scorer import RiskScorer from cortex.governance.evidence import EvidenceCollector, ControlMapping return Enforcer( policy_engine=PolicyEngine(policies_dir=DEFAULT_POLICIES_DIR), risk_scorer=RiskScorer(), evidence_collector=EvidenceCollector( control_mapping=ControlMapping(DEFAULT_CONTROLS_MAPPING) ), ) def cmd_evaluate(args: argparse.Namespace) -> None: """Full governance evaluation: policy + risk + evidence.""" enforcer = _get_enforcer() ctx = _build_context(args) decision = enforcer.evaluate(ctx) if args.json: print(json.dumps({ "verdict": decision.verdict, "reason": decision.reason, "risk_score": decision.risk.value, "risk_level": decision.risk.level, "risk_factors": decision.risk.factors, "policy_result": decision.policy_result, }, indent=2)) else: icon = {"approve": "✅", "deny": "❌", "escalate": "⚠️"}.get(decision.verdict, "❓") print(f"{icon} Verdict: {decision.verdict.upper()}") print(f" Reason: {decision.reason}") print(f" Risk: {decision.risk.value}/10 ({decision.risk.level})") for factor, detail in decision.risk.factors.items(): print(f" • {factor}: {detail.get('value', detail)} (+{detail.get('score', 0)})") def cmd_risk(args: argparse.Namespace) -> None: """Risk scoring only.""" scorer = _get_scorer() ctx = _build_context(args) result = scorer.score(ctx) if args.json: print(json.dumps({ "risk_score": result.value, "risk_level": result.level, "acceptable": result.is_acceptable, "factors": result.factors, }, indent=2)) else: icon = "🟢" if result.is_acceptable else "🔴" print(f"{icon} Risk Score: {result.value}/10 ({result.level})") print(f" Acceptable: {'yes' if result.is_acceptable else 'NO'}") for factor, detail in result.factors.items(): print(f" • {factor}: {detail.get('value', detail)} (+{detail.get('score', 0)})") def cmd_policies(args: argparse.Namespace) -> None: """List loaded policies.""" engine = _get_engine() if not engine.policies: print("No policies loaded.") return for policy in engine.policies: print(f"📋 {policy.name} (v{policy.version})") print(f" {policy.description}") print(f" Rules: {len(policy.rules)}") for rule in policy.rules: print(f" • {rule.name} → {rule.effect} (priority: {rule.priority})") print() def cmd_status(args: argparse.Namespace) -> None: """Show governance system status.""" engine = _get_engine() scorer = _get_scorer() policies_count = len(engine.policies) rules_count = sum(len(p.rules) for p in engine.policies) policies_dir = DEFAULT_POLICIES_DIR controls_file = DEFAULT_CONTROLS_MAPPING print("🛡️ Darkplex Governance Status") print(f" Policies dir: {policies_dir}") print(f" Controls map: {controls_file}") print(f" Policies loaded: {policies_count}") print(f" Total rules: {rules_count}") print(f" Policies dir exists: {'✅' if Path(policies_dir).exists() else '❌'}") print(f" Controls file exists: {'✅' if Path(controls_file).exists() else '❌'}") def cmd_report(args: argparse.Namespace) -> None: """Generate compliance report (placeholder — needs live evidence).""" from cortex.governance.evidence import EvidenceCollector, ControlMapping from cortex.governance.report_generator import ReportGenerator collector = EvidenceCollector( control_mapping=ControlMapping(DEFAULT_CONTROLS_MAPPING) ) generator = ReportGenerator(collector) if args.agent: report = generator.generate_agent_report(args.agent) else: report = generator.generate_compliance_report() output = json.dumps(report, indent=2) if args.output: Path(args.output).write_text(output) print(f"✅ Report written to {args.output}") else: print(output) def main() -> None: parser = argparse.ArgumentParser(prog="darkplex governance", description="Governance Engine") parser.add_argument("--json", action="store_true", help="JSON output") sub = parser.add_subparsers(dest="subcmd") # evaluate p_eval = sub.add_parser("evaluate", aliases=["eval"], help="Full policy + risk evaluation") p_eval.add_argument("--agent", required=True) p_eval.add_argument("--action", required=True) p_eval.add_argument("--data-type", default="public", choices=["public", "internal", "confidential", "restricted"]) p_eval.add_argument("--target", default="internal", choices=["internal", "external"]) p_eval.add_argument("--role", default="assistant", choices=["admin", "operator", "assistant", "external"]) p_eval.add_argument("--json", action="store_true", dest="json") # risk p_risk = sub.add_parser("risk", help="Risk scoring only") p_risk.add_argument("--agent", default="unknown") p_risk.add_argument("--action", default="unknown") p_risk.add_argument("--data-type", default="public", choices=["public", "internal", "confidential", "restricted"]) p_risk.add_argument("--target", default="internal", choices=["internal", "external"]) p_risk.add_argument("--role", default="assistant", choices=["admin", "operator", "assistant", "external"]) p_risk.add_argument("--json", action="store_true", dest="json") # policies p_pol = sub.add_parser("policies", help="List loaded policies") p_pol.add_argument("--reload", action="store_true") # status sub.add_parser("status", help="Show governance status") # report p_rep = sub.add_parser("report", help="Generate compliance report") p_rep.add_argument("--agent", default=None) p_rep.add_argument("--output", "-o", default=None) p_rep.add_argument("--json", action="store_true", dest="json") args = parser.parse_args() if args.subcmd in ("evaluate", "eval"): cmd_evaluate(args) elif args.subcmd == "risk": cmd_risk(args) elif args.subcmd == "policies": cmd_policies(args) elif args.subcmd == "status": cmd_status(args) elif args.subcmd == "report": cmd_report(args) else: parser.print_help() sys.exit(1) if __name__ == "__main__": main()