darkplex-core/cortex/governance/cli.py
Claudia fd7d75c0ed
Some checks failed
Tests / test (push) Failing after 2s
Merge darkplex-core into cortex — unified intelligence layer v0.2.0
- Merged all unique darkplex-core modules into cortex:
  - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop)
  - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator)
  - entity_manager.py, knowledge_extractor.py
- Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.'
- Added 'darkplex' CLI alias alongside 'cortex'
- Package renamed to darkplex-core v0.2.0
- 405 tests passing (was 234)
- 14 new test files covering all merged modules
2026-02-12 08:43:02 +01:00

228 lines
8.1 KiB
Python

"""Governance CLI — policy evaluation, risk scoring, evidence & reporting.
Usage:
darkplex governance evaluate --agent <name> --action <action> [--data-type <type>] [--target <target>] [--role <role>]
darkplex governance risk --agent <name> --action <action> [--data-type <type>] [--target <target>] [--role <role>]
darkplex governance evidence [--agent <name>] [--verdict <verdict>] [--control <id>] [--json]
darkplex governance report [--agent <name>] [--json] [--output <path>]
darkplex governance policies [--reload]
darkplex governance status
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
# Default paths
DEFAULT_POLICIES_DIR = os.environ.get(
"GOVERNANCE_POLICIES_DIR",
str(Path(__file__).parent / "policies"),
)
DEFAULT_CONTROLS_MAPPING = os.environ.get(
"GOVERNANCE_CONTROLS_MAPPING",
str(Path(__file__).parent / "controls" / "iso27001-mapping.yaml"),
)
def _build_context(args: argparse.Namespace) -> dict:
"""Build an evaluation context from CLI args."""
ctx = {}
if args.agent:
ctx["agent"] = args.agent
if args.action:
ctx["action"] = args.action
if args.data_type:
ctx["data_type"] = args.data_type
if args.target:
ctx["target"] = args.target
if args.role:
ctx["agent_role"] = args.role
return ctx
def _get_engine():
from governance.policy import PolicyEngine
return PolicyEngine(policies_dir=DEFAULT_POLICIES_DIR)
def _get_scorer():
from governance.risk_scorer import RiskScorer
return RiskScorer()
def _get_enforcer():
from governance.enforcer import Enforcer
from governance.policy import PolicyEngine
from governance.risk_scorer import RiskScorer
from governance.evidence import EvidenceCollector, ControlMapping
return Enforcer(
policy_engine=PolicyEngine(policies_dir=DEFAULT_POLICIES_DIR),
risk_scorer=RiskScorer(),
evidence_collector=EvidenceCollector(
control_mapping=ControlMapping(DEFAULT_CONTROLS_MAPPING)
),
)
def cmd_evaluate(args: argparse.Namespace) -> None:
"""Full governance evaluation: policy + risk + evidence."""
enforcer = _get_enforcer()
ctx = _build_context(args)
decision = enforcer.evaluate(ctx)
if args.json:
print(json.dumps({
"verdict": decision.verdict,
"reason": decision.reason,
"risk_score": decision.risk.value,
"risk_level": decision.risk.level,
"risk_factors": decision.risk.factors,
"policy_result": decision.policy_result,
}, indent=2))
else:
icon = {"approve": "", "deny": "", "escalate": "⚠️"}.get(decision.verdict, "")
print(f"{icon} Verdict: {decision.verdict.upper()}")
print(f" Reason: {decision.reason}")
print(f" Risk: {decision.risk.value}/10 ({decision.risk.level})")
for factor, detail in decision.risk.factors.items():
print(f"{factor}: {detail.get('value', detail)} (+{detail.get('score', 0)})")
def cmd_risk(args: argparse.Namespace) -> None:
"""Risk scoring only."""
scorer = _get_scorer()
ctx = _build_context(args)
result = scorer.score(ctx)
if args.json:
print(json.dumps({
"risk_score": result.value,
"risk_level": result.level,
"acceptable": result.is_acceptable,
"factors": result.factors,
}, indent=2))
else:
icon = "🟢" if result.is_acceptable else "🔴"
print(f"{icon} Risk Score: {result.value}/10 ({result.level})")
print(f" Acceptable: {'yes' if result.is_acceptable else 'NO'}")
for factor, detail in result.factors.items():
print(f"{factor}: {detail.get('value', detail)} (+{detail.get('score', 0)})")
def cmd_policies(args: argparse.Namespace) -> None:
"""List loaded policies."""
engine = _get_engine()
if not engine.policies:
print("No policies loaded.")
return
for policy in engine.policies:
print(f"📋 {policy.name} (v{policy.version})")
print(f" {policy.description}")
print(f" Rules: {len(policy.rules)}")
for rule in policy.rules:
print(f"{rule.name}{rule.effect} (priority: {rule.priority})")
print()
def cmd_status(args: argparse.Namespace) -> None:
"""Show governance system status."""
engine = _get_engine()
scorer = _get_scorer()
policies_count = len(engine.policies)
rules_count = sum(len(p.rules) for p in engine.policies)
policies_dir = DEFAULT_POLICIES_DIR
controls_file = DEFAULT_CONTROLS_MAPPING
print("🛡️ Darkplex Governance Status")
print(f" Policies dir: {policies_dir}")
print(f" Controls map: {controls_file}")
print(f" Policies loaded: {policies_count}")
print(f" Total rules: {rules_count}")
print(f" Policies dir exists: {'' if Path(policies_dir).exists() else ''}")
print(f" Controls file exists: {'' if Path(controls_file).exists() else ''}")
def cmd_report(args: argparse.Namespace) -> None:
"""Generate compliance report (placeholder — needs live evidence)."""
from governance.evidence import EvidenceCollector, ControlMapping
from governance.report_generator import ReportGenerator
collector = EvidenceCollector(
control_mapping=ControlMapping(DEFAULT_CONTROLS_MAPPING)
)
generator = ReportGenerator(collector)
if args.agent:
report = generator.generate_agent_report(args.agent)
else:
report = generator.generate_compliance_report()
output = json.dumps(report, indent=2)
if args.output:
Path(args.output).write_text(output)
print(f"✅ Report written to {args.output}")
else:
print(output)
def main() -> None:
parser = argparse.ArgumentParser(prog="darkplex governance", description="Governance Engine")
parser.add_argument("--json", action="store_true", help="JSON output")
sub = parser.add_subparsers(dest="subcmd")
# evaluate
p_eval = sub.add_parser("evaluate", aliases=["eval"], help="Full policy + risk evaluation")
p_eval.add_argument("--agent", required=True)
p_eval.add_argument("--action", required=True)
p_eval.add_argument("--data-type", default="public", choices=["public", "internal", "confidential", "restricted"])
p_eval.add_argument("--target", default="internal", choices=["internal", "external"])
p_eval.add_argument("--role", default="assistant", choices=["admin", "operator", "assistant", "external"])
p_eval.add_argument("--json", action="store_true", dest="json")
# risk
p_risk = sub.add_parser("risk", help="Risk scoring only")
p_risk.add_argument("--agent", default="unknown")
p_risk.add_argument("--action", default="unknown")
p_risk.add_argument("--data-type", default="public", choices=["public", "internal", "confidential", "restricted"])
p_risk.add_argument("--target", default="internal", choices=["internal", "external"])
p_risk.add_argument("--role", default="assistant", choices=["admin", "operator", "assistant", "external"])
p_risk.add_argument("--json", action="store_true", dest="json")
# policies
p_pol = sub.add_parser("policies", help="List loaded policies")
p_pol.add_argument("--reload", action="store_true")
# status
sub.add_parser("status", help="Show governance status")
# report
p_rep = sub.add_parser("report", help="Generate compliance report")
p_rep.add_argument("--agent", default=None)
p_rep.add_argument("--output", "-o", default=None)
p_rep.add_argument("--json", action="store_true", dest="json")
args = parser.parse_args()
if args.subcmd in ("evaluate", "eval"):
cmd_evaluate(args)
elif args.subcmd == "risk":
cmd_risk(args)
elif args.subcmd == "policies":
cmd_policies(args)
elif args.subcmd == "status":
cmd_status(args)
elif args.subcmd == "report":
cmd_report(args)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()