Some checks failed
Tests / test (push) Failing after 2s
- Merged all unique darkplex-core modules into cortex: - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop) - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator) - entity_manager.py, knowledge_extractor.py - Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.' - Added 'darkplex' CLI alias alongside 'cortex' - Package renamed to darkplex-core v0.2.0 - 405 tests passing (was 234) - 14 new test files covering all merged modules
126 lines
3.3 KiB
Python
126 lines
3.3 KiB
Python
"""Risk Scorer: context-based risk scoring for agent actions.
|
|
|
|
Risk levels:
|
|
- low (0-3): routine operations
|
|
- elevated (4-6): notable but acceptable
|
|
- high (7-8): requires escalation
|
|
- critical (9-10): auto-deny + alert
|
|
|
|
Factors: data classification, target (internal/external), agent role, time of day.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Data classification weights
|
|
DATA_WEIGHTS: dict[str, int] = {
|
|
"public": 0,
|
|
"internal": 2,
|
|
"confidential": 5,
|
|
"restricted": 8,
|
|
}
|
|
|
|
# Target weights
|
|
TARGET_WEIGHTS: dict[str, int] = {
|
|
"internal": 0,
|
|
"external": 3,
|
|
}
|
|
|
|
# Agent role weights (lower = more trusted)
|
|
ROLE_WEIGHTS: dict[str, int] = {
|
|
"admin": -1,
|
|
"operator": 0,
|
|
"assistant": 1,
|
|
"external": 3,
|
|
}
|
|
|
|
# Off-hours bonus (outside 8-18)
|
|
OFF_HOURS_BONUS = 2
|
|
|
|
|
|
@dataclass
|
|
class RiskResult:
|
|
"""Result of a risk assessment."""
|
|
|
|
value: int
|
|
level: str
|
|
factors: dict[str, Any]
|
|
|
|
@property
|
|
def is_acceptable(self) -> bool:
|
|
return self.value <= 6
|
|
|
|
|
|
def _classify_level(score: int) -> str:
|
|
"""Map a numeric score to a risk level."""
|
|
if score <= 3:
|
|
return "low"
|
|
elif score <= 6:
|
|
return "elevated"
|
|
elif score <= 8:
|
|
return "high"
|
|
else:
|
|
return "critical"
|
|
|
|
|
|
class RiskScorer:
|
|
"""Calculates contextual risk scores for agent actions.
|
|
|
|
Usage:
|
|
scorer = RiskScorer()
|
|
result = scorer.score({"data_type": "confidential", "target": "external"})
|
|
"""
|
|
|
|
def score(self, context: dict[str, Any]) -> RiskResult:
|
|
"""Score an action context and return a RiskResult.
|
|
|
|
Args:
|
|
context: Dict with optional keys:
|
|
- data_type: public|internal|confidential|restricted
|
|
- target: internal|external
|
|
- agent_role: admin|operator|assistant|external
|
|
- hour: 0-23 (defaults to current hour UTC)
|
|
"""
|
|
factors: dict[str, Any] = {}
|
|
total = 0
|
|
|
|
# Data classification
|
|
data_type = context.get("data_type", "public")
|
|
data_score = DATA_WEIGHTS.get(data_type, 0)
|
|
factors["data_type"] = {"value": data_type, "score": data_score}
|
|
total += data_score
|
|
|
|
# Target
|
|
target = context.get("target", "internal")
|
|
target_score = TARGET_WEIGHTS.get(target, 0)
|
|
factors["target"] = {"value": target, "score": target_score}
|
|
total += target_score
|
|
|
|
# Agent role
|
|
role = context.get("agent_role", "assistant")
|
|
role_score = ROLE_WEIGHTS.get(role, 1)
|
|
factors["agent_role"] = {"value": role, "score": role_score}
|
|
total += role_score
|
|
|
|
# Time of day
|
|
hour = context.get("hour")
|
|
if hour is None:
|
|
hour = datetime.now(timezone.utc).hour
|
|
is_off_hours = hour < 8 or hour >= 18
|
|
time_score = OFF_HOURS_BONUS if is_off_hours else 0
|
|
factors["time_of_day"] = {"hour": hour, "off_hours": is_off_hours, "score": time_score}
|
|
total += time_score
|
|
|
|
# Clamp to 0-10
|
|
total = max(0, min(10, total))
|
|
|
|
level = _classify_level(total)
|
|
logger.debug("Risk score: %d (%s) — factors: %s", total, level, factors)
|
|
|
|
return RiskResult(value=total, level=level, factors=factors)
|