darkplex-core/cortex/sentinel.py
Claudia 0123ec7090
All checks were successful
Tests / test (push) Successful in 3s
fix: format specifier crash when stream_info is None
2026-02-09 12:51:56 +01:00

807 lines
24 KiB
Python

#!/usr/bin/env python3
"""Cortex Sentinel — Security Feed Aggregation and CVE Matching.
Consolidated from ~/clawd/scripts/sentinel/ (rss-fetch.py, db.py, cve-match.py, report-gen.py)
Features:
- RSS security feed aggregation
- SQLite-based deduplication
- CVE matching against local inventory
- Report generation (markdown + AI summary)
Usage:
cortex sentinel scan [--nmap]
cortex sentinel report [--llm]
cortex sentinel matches
cortex sentinel stats
"""
import argparse
import hashlib
import json
import os
import re
import sqlite3
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Optional
import requests
from cortex.config import cortex_home
# Try to import feedparser (optional dependency)
try:
import feedparser
HAS_FEEDPARSER = True
except ImportError:
HAS_FEEDPARSER = False
# Disable SSL warnings for problematic feeds
try:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError:
pass
# --- Configuration ---
def _env(key: str, default: str = '') -> str:
return os.environ.get(key, default)
def sentinel_dir() -> Path:
"""Base directory for sentinel data."""
return cortex_home() / "sentinel"
def sentinel_db_path() -> Path:
"""Path to sentinel SQLite database."""
return sentinel_dir() / "sentinel.db"
def feeds_dir() -> Path:
"""Directory for feed output files."""
d = sentinel_dir() / "feeds"
d.mkdir(parents=True, exist_ok=True)
return d
def reports_dir() -> Path:
"""Directory for report output files."""
d = sentinel_dir() / "reports"
d.mkdir(parents=True, exist_ok=True)
return d
def llm_url() -> str:
"""LLM API URL for AI summaries."""
return _env('CORTEX_LLM_URL', 'http://localhost:11434/api/generate')
def llm_model() -> str:
"""LLM model to use."""
return _env('CORTEX_LLM_MODEL', 'mistral:7b')
# User agent for HTTP requests
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0"
# --- Feed Configuration ---
FEEDS = {
# Security News
"bleepingcomputer": {
"url": "https://www.bleepingcomputer.com/feed/",
"category": "security-news"
},
"hackernews": {
"url": "https://feeds.feedburner.com/TheHackersNews",
"category": "security-news"
},
"darkreading": {
"url": "https://www.darkreading.com/rss.xml",
"category": "security-news"
},
"schneier": {
"url": "https://www.schneier.com/feed/atom/",
"category": "security-news"
},
"securityweek": {
"url": "https://www.securityweek.com/feed/",
"category": "security-news"
},
# CVE/Vulnerability Feeds
"nvd-recent": {
"url": "https://nvd.nist.gov/feeds/xml/cve/misc/nvd-rss.xml",
"category": "cve",
"verify_ssl": False
},
"cisa-alerts": {
"url": "https://www.cisa.gov/cybersecurity-advisories/all.xml",
"category": "cve",
"verify_ssl": False
},
# AI/ML Security
"huggingface-blog": {
"url": "https://huggingface.co/blog/feed.xml",
"category": "ai-security"
},
"google-ai-blog": {
"url": "https://blog.google/technology/ai/rss/",
"category": "ai-security"
},
# Exploit Databases
"exploitdb": {
"url": "https://www.exploit-db.com/rss.xml",
"category": "exploits",
"verify_ssl": False
},
}
# Keywords that indicate relevance to our infrastructure
RELEVANT_KEYWORDS = [
# Tech stack
"linux", "debian", "nginx", "traefik", "docker", "postgresql", "redis",
"node.js", "nodejs", "python", "openssh", "git", "chromium", "openssl",
"ollama", "llm", "whisper", "matrix", "synapse", "element",
# Hardware
"amd", "radeon", "rocm", "fritzbox", "avm",
# Critical issues
"critical", "rce", "remote code execution", "zero-day", "0-day",
"ransomware", "supply chain", "authentication bypass",
# AI-specific
"prompt injection", "jailbreak", "model extraction", "adversarial",
"llm vulnerability", "ai safety", "model poisoning"
]
# Software inventory for CVE matching
INVENTORY = {
"operating_systems": [
{"name": "Debian", "version": "12", "aliases": ["debian", "bookworm"]},
{"name": "Linux Kernel", "version": "6.1", "aliases": ["linux", "kernel"]},
],
"services": [
{"name": "OpenSSH", "version": "9.2", "aliases": ["ssh", "openssh", "sshd"]},
{"name": "Nginx", "version": "1.22", "aliases": ["nginx"]},
{"name": "Traefik", "version": "2.10", "aliases": ["traefik"]},
{"name": "Docker", "version": "24", "aliases": ["docker", "containerd"]},
{"name": "Node.js", "version": "22", "aliases": ["node", "nodejs", "npm"]},
{"name": "Python", "version": "3.11", "aliases": ["python", "python3"]},
{"name": "PostgreSQL", "version": "15", "aliases": ["postgres", "postgresql"]},
{"name": "Redis", "version": "7", "aliases": ["redis"]},
{"name": "Ollama", "version": "0.1", "aliases": ["ollama", "llama"]},
],
"applications": [
{"name": "Chromium", "version": "120", "aliases": ["chromium", "chrome"]},
{"name": "Git", "version": "2.39", "aliases": ["git"]},
{"name": "OpenSSL", "version": "3.0", "aliases": ["openssl", "ssl", "tls"]},
],
"hardware": [
{"name": "AMD Radeon RX 5700 XT", "aliases": ["amd", "radeon", "rx5700", "navi", "gfx1010"]},
{"name": "Fritz!Box", "aliases": ["fritzbox", "fritz", "avm"]},
]
}
# --- Database ---
def get_db() -> sqlite3.Connection:
"""Get database connection with row factory."""
sentinel_dir().mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(sentinel_db_path())
conn.row_factory = sqlite3.Row
return conn
def init_db() -> None:
"""Initialize database schema."""
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS alerts (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
category TEXT,
title TEXT NOT NULL,
link TEXT,
summary TEXT,
severity TEXT DEFAULT 'info',
relevant INTEGER DEFAULT 0,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
seen_count INTEGER DEFAULT 1,
notified INTEGER DEFAULT 0,
acknowledged INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_alerts_source ON alerts(source);
CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity);
CREATE INDEX IF NOT EXISTS idx_alerts_first_seen ON alerts(first_seen);
CREATE INDEX IF NOT EXISTS idx_alerts_notified ON alerts(notified);
CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
total_fetched INTEGER,
new_alerts INTEGER,
duplicates INTEGER,
notified INTEGER
);
""")
conn.commit()
conn.close()
def add_alert(alert: dict) -> bool:
"""Add alert if new, update if exists. Returns True if new."""
conn = get_db()
now = datetime.now().isoformat()
cur = conn.execute("SELECT id, seen_count FROM alerts WHERE id = ?", (alert["id"],))
existing = cur.fetchone()
if existing:
conn.execute("""
UPDATE alerts SET last_seen = ?, seen_count = seen_count + 1
WHERE id = ?
""", (now, alert["id"]))
conn.commit()
conn.close()
return False
else:
conn.execute("""
INSERT INTO alerts (id, source, category, title, link, summary,
severity, relevant, first_seen, last_seen)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
alert["id"],
alert.get("source", "unknown"),
alert.get("category", ""),
alert.get("title", "")[:500],
alert.get("link", ""),
alert.get("summary", "")[:1000],
alert.get("severity", "info"),
1 if alert.get("relevant") else 0,
now,
now
))
conn.commit()
conn.close()
return True
def log_run(total: int, new: int, dupes: int, notified: int = 0) -> None:
"""Log a sentinel run."""
conn = get_db()
conn.execute("""
INSERT INTO runs (timestamp, total_fetched, new_alerts, duplicates, notified)
VALUES (?, ?, ?, ?, ?)
""", (datetime.now().isoformat(), total, new, dupes, notified))
conn.commit()
conn.close()
def get_stats() -> dict:
"""Get database statistics."""
conn = get_db()
stats = {}
cur = conn.execute("SELECT COUNT(*) FROM alerts")
stats["total_alerts"] = cur.fetchone()[0]
cur = conn.execute("""
SELECT severity, COUNT(*) as count FROM alerts
GROUP BY severity ORDER BY count DESC
""")
stats["by_severity"] = {row["severity"]: row["count"] for row in cur.fetchall()}
cur = conn.execute("SELECT COUNT(*) FROM alerts WHERE notified = 0 AND relevant = 1")
stats["unnotified"] = cur.fetchone()[0]
yesterday = (datetime.now() - timedelta(days=1)).isoformat()
cur = conn.execute("SELECT COUNT(*) FROM alerts WHERE first_seen > ?", (yesterday,))
stats["last_24h"] = cur.fetchone()[0]
cur = conn.execute("SELECT * FROM runs ORDER BY timestamp DESC LIMIT 5")
stats["recent_runs"] = [dict(row) for row in cur.fetchall()]
conn.close()
return stats
def get_unnotified_alerts(min_severity: str = "medium") -> list[dict]:
"""Get alerts that haven't been notified yet."""
severity_order = {"critical": 1, "high": 2, "medium": 3, "info": 4}
min_level = severity_order.get(min_severity, 3)
conn = get_db()
cur = conn.execute("""
SELECT * FROM alerts
WHERE notified = 0 AND relevant = 1
ORDER BY
CASE severity
WHEN 'critical' THEN 1
WHEN 'high' THEN 2
WHEN 'medium' THEN 3
ELSE 4
END,
first_seen DESC
LIMIT 20
""")
alerts = [dict(row) for row in cur.fetchall()]
conn.close()
return [a for a in alerts if severity_order.get(a["severity"], 4) <= min_level]
def get_recent_alerts(limit: int = 50) -> list[dict]:
"""Get recent alerts from database."""
conn = get_db()
cur = conn.execute("""
SELECT * FROM alerts
ORDER BY first_seen DESC
LIMIT ?
""", (limit,))
alerts = [dict(row) for row in cur.fetchall()]
conn.close()
return alerts
# --- Feed Fetching ---
def fetch_feed(name: str, config: dict) -> list[dict]:
"""Fetch a single RSS feed."""
if not HAS_FEEDPARSER:
print(f" ⚠️ feedparser not installed", file=sys.stderr)
return []
url = config["url"]
verify_ssl = config.get("verify_ssl", True)
try:
headers = {"User-Agent": USER_AGENT}
response = requests.get(url, headers=headers, timeout=15, verify=verify_ssl)
response.raise_for_status()
feed = feedparser.parse(response.content)
if feed.bozo and not feed.entries:
print(f" ⚠️ {name}: Parse error", file=sys.stderr)
return []
entries = []
for entry in feed.entries[:20]: # Max 20 per feed
title = entry.get("title", "No title")
link = entry.get("link", "")
summary = entry.get("summary", entry.get("description", ""))[:500]
published = entry.get("published", entry.get("updated", ""))
# Check relevance
text_check = f"{title} {summary}".lower()
is_relevant = any(kw in text_check for kw in RELEVANT_KEYWORDS)
# Determine severity
severity = "info"
if any(kw in text_check for kw in ["critical", "rce", "zero-day", "0-day", "ransomware"]):
severity = "critical"
elif any(kw in text_check for kw in ["high", "remote", "exploit", "vulnerability"]):
severity = "high"
elif any(kw in text_check for kw in ["medium", "moderate", "security"]):
severity = "medium"
entries.append({
"id": hashlib.md5(f"{name}:{link}".encode()).hexdigest()[:12],
"source": name,
"category": config["category"],
"title": title,
"link": link,
"summary": summary[:300],
"published": published,
"severity": severity,
"relevant": is_relevant,
"fetched_at": datetime.now().isoformat()
})
print(f"{name}: {len(entries)} entries", file=sys.stderr)
return entries
except requests.exceptions.SSLError:
if verify_ssl:
config["verify_ssl"] = False
return fetch_feed(name, config)
return []
except requests.exceptions.Timeout:
print(f"{name}: Timeout", file=sys.stderr)
return []
except requests.exceptions.RequestException as e:
print(f"{name}: {type(e).__name__}", file=sys.stderr)
return []
except Exception as e:
print(f"{name}: {e}", file=sys.stderr)
return []
def fetch_all_feeds() -> tuple[list[dict], int, int]:
"""Fetch all configured feeds. Returns (entries, successful, failed)."""
all_entries = []
successful = 0
failed = 0
for name, config in FEEDS.items():
entries = fetch_feed(name, config.copy())
if entries:
all_entries.extend(entries)
successful += 1
else:
failed += 1
return all_entries, successful, failed
# --- CVE Matching ---
def check_inventory_match(text: str) -> list[dict]:
"""Check if text mentions any inventory items."""
text_lower = text.lower()
matches = []
for category, items in INVENTORY.items():
for item in items:
for alias in item.get("aliases", []):
if alias in text_lower:
matches.append({
"category": category,
"name": item["name"],
"version": item.get("version"),
"matched_alias": alias
})
break
return matches
def analyze_matches(alerts: list[dict]) -> dict:
"""Analyze alerts for inventory matches."""
relevant = []
critical = []
category_counts = {}
for alert in alerts:
text = f"{alert.get('title', '')} {alert.get('summary', '')}"
matches = check_inventory_match(text)
if matches:
alert["inventory_matches"] = matches
alert["match_count"] = len(matches)
relevant.append(alert)
if alert.get("severity") == "critical":
critical.append(alert)
for match in matches:
cat = match["category"]
category_counts[cat] = category_counts.get(cat, 0) + 1
relevant.sort(key=lambda x: (-x.get("match_count", 0), x.get("severity", "info")))
return {
"analysis_time": datetime.now().isoformat(),
"source_alerts": len(alerts),
"relevant_alerts": len(relevant),
"critical_relevant": len(critical),
"category_breakdown": category_counts,
"critical": critical[:10],
"relevant": relevant[:20],
}
# --- Report Generation ---
def generate_report(data: dict, use_llm: bool = False) -> str:
"""Generate markdown security report."""
now = datetime.now()
lines = [
"# 🔒 Security Sentinel Report",
f"**Generated:** {now.strftime('%Y-%m-%d %H:%M')}",
""
]
# Stats
stats = get_stats()
lines.extend([
"## 📊 Database Stats",
f"- **Total alerts:** {stats['total_alerts']}",
f"- **Last 24h:** {stats['last_24h']}",
f"- **Unnotified:** {stats['unnotified']}",
""
])
# Matches
if data.get("relevant"):
lines.extend([
f"## 🎯 Relevant Alerts ({data['relevant_alerts']})",
""
])
if data.get("critical"):
lines.append("### ⚠️ Critical")
for alert in data["critical"][:5]:
matches = ", ".join(m["name"] for m in alert.get("inventory_matches", []))
lines.extend([
f"- **{alert['title'][:80]}**",
f" - Source: {alert.get('source', 'unknown')}",
f" - Affects: {matches}",
""
])
lines.append("### 📋 Other Relevant")
for alert in data["relevant"][:10]:
if alert in data.get("critical", []):
continue
matches = ", ".join(m["name"] for m in alert.get("inventory_matches", []))
lines.append(f"- {alert['title'][:60]}... ({matches})")
lines.append("")
# AI Summary
if use_llm and data.get("relevant"):
lines.extend(["## 🤖 AI Summary", ""])
summary = get_ai_summary(data["relevant"][:10])
lines.extend([summary, ""])
# Actions
lines.extend([
"## 📝 Recommended Actions",
""
])
if data.get("critical"):
lines.append("1. Review critical alerts and check for available patches")
if stats["unnotified"] > 10:
lines.append(f"2. Process {stats['unnotified']} unnotified alerts")
if not data.get("critical") and stats["unnotified"] <= 10:
lines.append("✅ No immediate actions required")
return "\n".join(lines)
def get_ai_summary(alerts: list[dict]) -> str:
"""Get AI summary of alerts."""
if not alerts:
return "No alerts to summarize."
alert_text = "\n".join([
f"- [{a.get('severity', 'info').upper()}] {a.get('title', '')}"
for a in alerts[:15]
])
prompt = f"""Du bist ein Security-Analyst. Fasse diese Security-Alerts kurz zusammen (max 5 Sätze, Deutsch).
Fokus: Was ist kritisch? Was erfordert Aktion?
Alerts:
{alert_text}
Zusammenfassung:"""
try:
response = requests.post(
llm_url(),
json={
"model": llm_model(),
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.3, "num_predict": 300}
},
timeout=60
)
if response.status_code == 200:
return response.json().get("response", "").strip()
except Exception as e:
return f"(LLM nicht erreichbar: {e})"
return "(Zusammenfassung nicht verfügbar)"
# --- Commands ---
def cmd_scan(include_nmap: bool = False) -> None:
"""Scan security feeds and update database."""
init_db()
print(f"🛡️ Sentinel Scan — {datetime.now().strftime('%Y-%m-%d %H:%M')}", file=sys.stderr)
print(f" Fetching {len(FEEDS)} feeds...", file=sys.stderr)
all_entries, successful, failed = fetch_all_feeds()
print(f"\n Feeds: {successful}/{successful+failed} OK", file=sys.stderr)
# Deduplicate via SQLite
print("\n🔍 Deduplicating...", file=sys.stderr)
new_count = 0
dupe_count = 0
new_entries = []
for entry in all_entries:
if add_alert(entry):
new_entries.append(entry)
new_count += 1
else:
dupe_count += 1
# Log run
log_run(len(all_entries), new_count, dupe_count, 0)
# Stats
relevant_new = sum(1 for e in new_entries if e.get("relevant"))
critical_new = sum(1 for e in new_entries if e.get("severity") == "critical")
print(f"\n📊 Summary:", file=sys.stderr)
print(f" Fetched: {len(all_entries)}", file=sys.stderr)
print(f" New: {new_count} ({relevant_new} relevant, {critical_new} critical)", file=sys.stderr)
print(f" Duplicates: {dupe_count}", file=sys.stderr)
# Save to file
output = {
"fetched_at": datetime.now().isoformat(),
"stats": {
"total_fetched": len(all_entries),
"new_alerts": new_count,
"duplicates": dupe_count,
"relevant": relevant_new,
"critical": critical_new
},
"entries": new_entries
}
output_file = feeds_dir() / "alerts_latest.json"
output_file.write_text(json.dumps(output, indent=2))
print(f" Output: {output_file}", file=sys.stderr)
def cmd_matches() -> None:
"""Show CVE matches against inventory."""
alerts = get_recent_alerts(100)
if not alerts:
print("No alerts in database. Run 'cortex sentinel scan' first.")
return
data = analyze_matches(alerts)
print(f"🎯 Inventory Matches ({data['relevant_alerts']} of {data['source_alerts']})\n")
if data.get("critical"):
print("⚠️ CRITICAL:\n")
for alert in data["critical"][:5]:
matches = ", ".join(m["name"] for m in alert.get("inventory_matches", []))
print(f"{alert['title'][:70]}")
print(f" Affects: {matches}\n")
if data.get("relevant"):
print("\n📋 Other relevant:\n")
for alert in data["relevant"][:10]:
if alert in data.get("critical", []):
continue
matches = ", ".join(m["name"] for m in alert.get("inventory_matches", []))
print(f"{alert['title'][:60]}... ({matches})")
if data.get("category_breakdown"):
print(f"\n📊 By category: {data['category_breakdown']}")
# Save
report_file = reports_dir() / f"match_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
report_file.write_text(json.dumps(data, indent=2))
def cmd_report(use_llm: bool = False) -> None:
"""Generate security report."""
alerts = get_recent_alerts(100)
data = analyze_matches(alerts)
report = generate_report(data, use_llm)
# Save
report_file = reports_dir() / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
report_file.write_text(report)
# Symlink latest
latest = reports_dir() / "report_latest.md"
if latest.exists() or latest.is_symlink():
latest.unlink()
latest.symlink_to(report_file.name)
print(f"✅ Report saved: {report_file}", file=sys.stderr)
print(report)
def cmd_stats() -> None:
"""Show database statistics."""
init_db()
stats = get_stats()
print("📊 Sentinel Stats\n")
print(f"Total alerts: {stats['total_alerts']}")
print(f"Last 24h: {stats['last_24h']}")
print(f"Unnotified: {stats['unnotified']}")
if stats.get("by_severity"):
print(f"\nBy severity:")
for sev, count in stats["by_severity"].items():
print(f" {sev}: {count}")
if stats.get("recent_runs"):
print(f"\nRecent runs:")
for run in stats["recent_runs"][:3]:
ts = run.get("timestamp", "")[:16]
print(f" {ts}{run.get('new_alerts', 0)} new, {run.get('duplicates', 0)} dupes")
# --- Main ---
def main():
parser = argparse.ArgumentParser(
description='Security Feed Aggregation and CVE Matching',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Commands:
scan Fetch security feeds and update database
matches Show alerts matching local inventory
report Generate markdown security report
stats Show database statistics
Examples:
cortex sentinel scan
cortex sentinel matches
cortex sentinel report --llm
'''
)
sub = parser.add_subparsers(dest='command')
# scan
scan_p = sub.add_parser('scan', help='Fetch security feeds')
scan_p.add_argument('--nmap', action='store_true',
help='Include network scan (slow)')
# matches
sub.add_parser('matches', help='Show inventory matches')
# report
report_p = sub.add_parser('report', help='Generate report')
report_p.add_argument('--llm', action='store_true',
help='Include AI summary')
# stats
sub.add_parser('stats', help='Show database stats')
args = parser.parse_args()
if args.command == 'scan':
cmd_scan(getattr(args, 'nmap', False))
elif args.command == 'matches':
cmd_matches()
elif args.command == 'report':
cmd_report(getattr(args, 'llm', False))
elif args.command == 'stats':
cmd_stats()
else:
parser.print_help()
if __name__ == '__main__':
main()