807 lines
24 KiB
Python
807 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""Cortex Sentinel — Security Feed Aggregation and CVE Matching.
|
|
|
|
Consolidated from ~/clawd/scripts/sentinel/ (rss-fetch.py, db.py, cve-match.py, report-gen.py)
|
|
|
|
Features:
|
|
- RSS security feed aggregation
|
|
- SQLite-based deduplication
|
|
- CVE matching against local inventory
|
|
- Report generation (markdown + AI summary)
|
|
|
|
Usage:
|
|
cortex sentinel scan [--nmap]
|
|
cortex sentinel report [--llm]
|
|
cortex sentinel matches
|
|
cortex sentinel stats
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import requests
|
|
|
|
from cortex.config import cortex_home
|
|
|
|
# Try to import feedparser (optional dependency)
|
|
try:
|
|
import feedparser
|
|
HAS_FEEDPARSER = True
|
|
except ImportError:
|
|
HAS_FEEDPARSER = False
|
|
|
|
# Disable SSL warnings for problematic feeds
|
|
try:
|
|
import urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
# --- Configuration ---
|
|
|
|
def _env(key: str, default: str = '') -> str:
|
|
return os.environ.get(key, default)
|
|
|
|
|
|
def sentinel_dir() -> Path:
|
|
"""Base directory for sentinel data."""
|
|
return cortex_home() / "sentinel"
|
|
|
|
|
|
def sentinel_db_path() -> Path:
|
|
"""Path to sentinel SQLite database."""
|
|
return sentinel_dir() / "sentinel.db"
|
|
|
|
|
|
def feeds_dir() -> Path:
|
|
"""Directory for feed output files."""
|
|
d = sentinel_dir() / "feeds"
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def reports_dir() -> Path:
|
|
"""Directory for report output files."""
|
|
d = sentinel_dir() / "reports"
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def llm_url() -> str:
|
|
"""LLM API URL for AI summaries."""
|
|
return _env('CORTEX_LLM_URL', 'http://localhost:11434/api/generate')
|
|
|
|
|
|
def llm_model() -> str:
|
|
"""LLM model to use."""
|
|
return _env('CORTEX_LLM_MODEL', 'mistral:7b')
|
|
|
|
|
|
# User agent for HTTP requests
|
|
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0"
|
|
|
|
|
|
# --- Feed Configuration ---
|
|
|
|
FEEDS = {
|
|
# Security News
|
|
"bleepingcomputer": {
|
|
"url": "https://www.bleepingcomputer.com/feed/",
|
|
"category": "security-news"
|
|
},
|
|
"hackernews": {
|
|
"url": "https://feeds.feedburner.com/TheHackersNews",
|
|
"category": "security-news"
|
|
},
|
|
"darkreading": {
|
|
"url": "https://www.darkreading.com/rss.xml",
|
|
"category": "security-news"
|
|
},
|
|
"schneier": {
|
|
"url": "https://www.schneier.com/feed/atom/",
|
|
"category": "security-news"
|
|
},
|
|
"securityweek": {
|
|
"url": "https://www.securityweek.com/feed/",
|
|
"category": "security-news"
|
|
},
|
|
|
|
# CVE/Vulnerability Feeds
|
|
"nvd-recent": {
|
|
"url": "https://nvd.nist.gov/feeds/xml/cve/misc/nvd-rss.xml",
|
|
"category": "cve",
|
|
"verify_ssl": False
|
|
},
|
|
"cisa-alerts": {
|
|
"url": "https://www.cisa.gov/cybersecurity-advisories/all.xml",
|
|
"category": "cve",
|
|
"verify_ssl": False
|
|
},
|
|
|
|
# AI/ML Security
|
|
"huggingface-blog": {
|
|
"url": "https://huggingface.co/blog/feed.xml",
|
|
"category": "ai-security"
|
|
},
|
|
"google-ai-blog": {
|
|
"url": "https://blog.google/technology/ai/rss/",
|
|
"category": "ai-security"
|
|
},
|
|
|
|
# Exploit Databases
|
|
"exploitdb": {
|
|
"url": "https://www.exploit-db.com/rss.xml",
|
|
"category": "exploits",
|
|
"verify_ssl": False
|
|
},
|
|
}
|
|
|
|
|
|
# Keywords that indicate relevance to our infrastructure
|
|
RELEVANT_KEYWORDS = [
|
|
# Tech stack
|
|
"linux", "debian", "nginx", "traefik", "docker", "postgresql", "redis",
|
|
"node.js", "nodejs", "python", "openssh", "git", "chromium", "openssl",
|
|
"ollama", "llm", "whisper", "matrix", "synapse", "element",
|
|
|
|
# Hardware
|
|
"amd", "radeon", "rocm", "fritzbox", "avm",
|
|
|
|
# Critical issues
|
|
"critical", "rce", "remote code execution", "zero-day", "0-day",
|
|
"ransomware", "supply chain", "authentication bypass",
|
|
|
|
# AI-specific
|
|
"prompt injection", "jailbreak", "model extraction", "adversarial",
|
|
"llm vulnerability", "ai safety", "model poisoning"
|
|
]
|
|
|
|
|
|
# Software inventory for CVE matching
|
|
INVENTORY = {
|
|
"operating_systems": [
|
|
{"name": "Debian", "version": "12", "aliases": ["debian", "bookworm"]},
|
|
{"name": "Linux Kernel", "version": "6.1", "aliases": ["linux", "kernel"]},
|
|
],
|
|
"services": [
|
|
{"name": "OpenSSH", "version": "9.2", "aliases": ["ssh", "openssh", "sshd"]},
|
|
{"name": "Nginx", "version": "1.22", "aliases": ["nginx"]},
|
|
{"name": "Traefik", "version": "2.10", "aliases": ["traefik"]},
|
|
{"name": "Docker", "version": "24", "aliases": ["docker", "containerd"]},
|
|
{"name": "Node.js", "version": "22", "aliases": ["node", "nodejs", "npm"]},
|
|
{"name": "Python", "version": "3.11", "aliases": ["python", "python3"]},
|
|
{"name": "PostgreSQL", "version": "15", "aliases": ["postgres", "postgresql"]},
|
|
{"name": "Redis", "version": "7", "aliases": ["redis"]},
|
|
{"name": "Ollama", "version": "0.1", "aliases": ["ollama", "llama"]},
|
|
],
|
|
"applications": [
|
|
{"name": "Chromium", "version": "120", "aliases": ["chromium", "chrome"]},
|
|
{"name": "Git", "version": "2.39", "aliases": ["git"]},
|
|
{"name": "OpenSSL", "version": "3.0", "aliases": ["openssl", "ssl", "tls"]},
|
|
],
|
|
"hardware": [
|
|
{"name": "AMD Radeon RX 5700 XT", "aliases": ["amd", "radeon", "rx5700", "navi", "gfx1010"]},
|
|
{"name": "Fritz!Box", "aliases": ["fritzbox", "fritz", "avm"]},
|
|
]
|
|
}
|
|
|
|
|
|
# --- Database ---
|
|
|
|
def get_db() -> sqlite3.Connection:
|
|
"""Get database connection with row factory."""
|
|
sentinel_dir().mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(sentinel_db_path())
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db() -> None:
|
|
"""Initialize database schema."""
|
|
conn = get_db()
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS alerts (
|
|
id TEXT PRIMARY KEY,
|
|
source TEXT NOT NULL,
|
|
category TEXT,
|
|
title TEXT NOT NULL,
|
|
link TEXT,
|
|
summary TEXT,
|
|
severity TEXT DEFAULT 'info',
|
|
relevant INTEGER DEFAULT 0,
|
|
first_seen TEXT NOT NULL,
|
|
last_seen TEXT NOT NULL,
|
|
seen_count INTEGER DEFAULT 1,
|
|
notified INTEGER DEFAULT 0,
|
|
acknowledged INTEGER DEFAULT 0
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_alerts_source ON alerts(source);
|
|
CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity);
|
|
CREATE INDEX IF NOT EXISTS idx_alerts_first_seen ON alerts(first_seen);
|
|
CREATE INDEX IF NOT EXISTS idx_alerts_notified ON alerts(notified);
|
|
|
|
CREATE TABLE IF NOT EXISTS runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
total_fetched INTEGER,
|
|
new_alerts INTEGER,
|
|
duplicates INTEGER,
|
|
notified INTEGER
|
|
);
|
|
""")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def add_alert(alert: dict) -> bool:
|
|
"""Add alert if new, update if exists. Returns True if new."""
|
|
conn = get_db()
|
|
now = datetime.now().isoformat()
|
|
|
|
cur = conn.execute("SELECT id, seen_count FROM alerts WHERE id = ?", (alert["id"],))
|
|
existing = cur.fetchone()
|
|
|
|
if existing:
|
|
conn.execute("""
|
|
UPDATE alerts SET last_seen = ?, seen_count = seen_count + 1
|
|
WHERE id = ?
|
|
""", (now, alert["id"]))
|
|
conn.commit()
|
|
conn.close()
|
|
return False
|
|
else:
|
|
conn.execute("""
|
|
INSERT INTO alerts (id, source, category, title, link, summary,
|
|
severity, relevant, first_seen, last_seen)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
alert["id"],
|
|
alert.get("source", "unknown"),
|
|
alert.get("category", ""),
|
|
alert.get("title", "")[:500],
|
|
alert.get("link", ""),
|
|
alert.get("summary", "")[:1000],
|
|
alert.get("severity", "info"),
|
|
1 if alert.get("relevant") else 0,
|
|
now,
|
|
now
|
|
))
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
|
|
|
|
def log_run(total: int, new: int, dupes: int, notified: int = 0) -> None:
|
|
"""Log a sentinel run."""
|
|
conn = get_db()
|
|
conn.execute("""
|
|
INSERT INTO runs (timestamp, total_fetched, new_alerts, duplicates, notified)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (datetime.now().isoformat(), total, new, dupes, notified))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def get_stats() -> dict:
|
|
"""Get database statistics."""
|
|
conn = get_db()
|
|
stats = {}
|
|
|
|
cur = conn.execute("SELECT COUNT(*) FROM alerts")
|
|
stats["total_alerts"] = cur.fetchone()[0]
|
|
|
|
cur = conn.execute("""
|
|
SELECT severity, COUNT(*) as count FROM alerts
|
|
GROUP BY severity ORDER BY count DESC
|
|
""")
|
|
stats["by_severity"] = {row["severity"]: row["count"] for row in cur.fetchall()}
|
|
|
|
cur = conn.execute("SELECT COUNT(*) FROM alerts WHERE notified = 0 AND relevant = 1")
|
|
stats["unnotified"] = cur.fetchone()[0]
|
|
|
|
yesterday = (datetime.now() - timedelta(days=1)).isoformat()
|
|
cur = conn.execute("SELECT COUNT(*) FROM alerts WHERE first_seen > ?", (yesterday,))
|
|
stats["last_24h"] = cur.fetchone()[0]
|
|
|
|
cur = conn.execute("SELECT * FROM runs ORDER BY timestamp DESC LIMIT 5")
|
|
stats["recent_runs"] = [dict(row) for row in cur.fetchall()]
|
|
|
|
conn.close()
|
|
return stats
|
|
|
|
|
|
def get_unnotified_alerts(min_severity: str = "medium") -> list[dict]:
|
|
"""Get alerts that haven't been notified yet."""
|
|
severity_order = {"critical": 1, "high": 2, "medium": 3, "info": 4}
|
|
min_level = severity_order.get(min_severity, 3)
|
|
|
|
conn = get_db()
|
|
cur = conn.execute("""
|
|
SELECT * FROM alerts
|
|
WHERE notified = 0 AND relevant = 1
|
|
ORDER BY
|
|
CASE severity
|
|
WHEN 'critical' THEN 1
|
|
WHEN 'high' THEN 2
|
|
WHEN 'medium' THEN 3
|
|
ELSE 4
|
|
END,
|
|
first_seen DESC
|
|
LIMIT 20
|
|
""")
|
|
alerts = [dict(row) for row in cur.fetchall()]
|
|
conn.close()
|
|
|
|
return [a for a in alerts if severity_order.get(a["severity"], 4) <= min_level]
|
|
|
|
|
|
def get_recent_alerts(limit: int = 50) -> list[dict]:
|
|
"""Get recent alerts from database."""
|
|
conn = get_db()
|
|
cur = conn.execute("""
|
|
SELECT * FROM alerts
|
|
ORDER BY first_seen DESC
|
|
LIMIT ?
|
|
""", (limit,))
|
|
alerts = [dict(row) for row in cur.fetchall()]
|
|
conn.close()
|
|
return alerts
|
|
|
|
|
|
# --- Feed Fetching ---
|
|
|
|
def fetch_feed(name: str, config: dict) -> list[dict]:
|
|
"""Fetch a single RSS feed."""
|
|
if not HAS_FEEDPARSER:
|
|
print(f" ⚠️ feedparser not installed", file=sys.stderr)
|
|
return []
|
|
|
|
url = config["url"]
|
|
verify_ssl = config.get("verify_ssl", True)
|
|
|
|
try:
|
|
headers = {"User-Agent": USER_AGENT}
|
|
response = requests.get(url, headers=headers, timeout=15, verify=verify_ssl)
|
|
response.raise_for_status()
|
|
|
|
feed = feedparser.parse(response.content)
|
|
|
|
if feed.bozo and not feed.entries:
|
|
print(f" ⚠️ {name}: Parse error", file=sys.stderr)
|
|
return []
|
|
|
|
entries = []
|
|
for entry in feed.entries[:20]: # Max 20 per feed
|
|
title = entry.get("title", "No title")
|
|
link = entry.get("link", "")
|
|
summary = entry.get("summary", entry.get("description", ""))[:500]
|
|
published = entry.get("published", entry.get("updated", ""))
|
|
|
|
# Check relevance
|
|
text_check = f"{title} {summary}".lower()
|
|
is_relevant = any(kw in text_check for kw in RELEVANT_KEYWORDS)
|
|
|
|
# Determine severity
|
|
severity = "info"
|
|
if any(kw in text_check for kw in ["critical", "rce", "zero-day", "0-day", "ransomware"]):
|
|
severity = "critical"
|
|
elif any(kw in text_check for kw in ["high", "remote", "exploit", "vulnerability"]):
|
|
severity = "high"
|
|
elif any(kw in text_check for kw in ["medium", "moderate", "security"]):
|
|
severity = "medium"
|
|
|
|
entries.append({
|
|
"id": hashlib.md5(f"{name}:{link}".encode()).hexdigest()[:12],
|
|
"source": name,
|
|
"category": config["category"],
|
|
"title": title,
|
|
"link": link,
|
|
"summary": summary[:300],
|
|
"published": published,
|
|
"severity": severity,
|
|
"relevant": is_relevant,
|
|
"fetched_at": datetime.now().isoformat()
|
|
})
|
|
|
|
print(f" ✅ {name}: {len(entries)} entries", file=sys.stderr)
|
|
return entries
|
|
|
|
except requests.exceptions.SSLError:
|
|
if verify_ssl:
|
|
config["verify_ssl"] = False
|
|
return fetch_feed(name, config)
|
|
return []
|
|
except requests.exceptions.Timeout:
|
|
print(f" ❌ {name}: Timeout", file=sys.stderr)
|
|
return []
|
|
except requests.exceptions.RequestException as e:
|
|
print(f" ❌ {name}: {type(e).__name__}", file=sys.stderr)
|
|
return []
|
|
except Exception as e:
|
|
print(f" ❌ {name}: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
|
|
def fetch_all_feeds() -> tuple[list[dict], int, int]:
|
|
"""Fetch all configured feeds. Returns (entries, successful, failed)."""
|
|
all_entries = []
|
|
successful = 0
|
|
failed = 0
|
|
|
|
for name, config in FEEDS.items():
|
|
entries = fetch_feed(name, config.copy())
|
|
if entries:
|
|
all_entries.extend(entries)
|
|
successful += 1
|
|
else:
|
|
failed += 1
|
|
|
|
return all_entries, successful, failed
|
|
|
|
|
|
# --- CVE Matching ---
|
|
|
|
def check_inventory_match(text: str) -> list[dict]:
|
|
"""Check if text mentions any inventory items."""
|
|
text_lower = text.lower()
|
|
matches = []
|
|
|
|
for category, items in INVENTORY.items():
|
|
for item in items:
|
|
for alias in item.get("aliases", []):
|
|
if alias in text_lower:
|
|
matches.append({
|
|
"category": category,
|
|
"name": item["name"],
|
|
"version": item.get("version"),
|
|
"matched_alias": alias
|
|
})
|
|
break
|
|
|
|
return matches
|
|
|
|
|
|
def analyze_matches(alerts: list[dict]) -> dict:
|
|
"""Analyze alerts for inventory matches."""
|
|
relevant = []
|
|
critical = []
|
|
category_counts = {}
|
|
|
|
for alert in alerts:
|
|
text = f"{alert.get('title', '')} {alert.get('summary', '')}"
|
|
matches = check_inventory_match(text)
|
|
|
|
if matches:
|
|
alert["inventory_matches"] = matches
|
|
alert["match_count"] = len(matches)
|
|
relevant.append(alert)
|
|
|
|
if alert.get("severity") == "critical":
|
|
critical.append(alert)
|
|
|
|
for match in matches:
|
|
cat = match["category"]
|
|
category_counts[cat] = category_counts.get(cat, 0) + 1
|
|
|
|
relevant.sort(key=lambda x: (-x.get("match_count", 0), x.get("severity", "info")))
|
|
|
|
return {
|
|
"analysis_time": datetime.now().isoformat(),
|
|
"source_alerts": len(alerts),
|
|
"relevant_alerts": len(relevant),
|
|
"critical_relevant": len(critical),
|
|
"category_breakdown": category_counts,
|
|
"critical": critical[:10],
|
|
"relevant": relevant[:20],
|
|
}
|
|
|
|
|
|
# --- Report Generation ---
|
|
|
|
def generate_report(data: dict, use_llm: bool = False) -> str:
|
|
"""Generate markdown security report."""
|
|
now = datetime.now()
|
|
|
|
lines = [
|
|
"# 🔒 Security Sentinel Report",
|
|
f"**Generated:** {now.strftime('%Y-%m-%d %H:%M')}",
|
|
""
|
|
]
|
|
|
|
# Stats
|
|
stats = get_stats()
|
|
lines.extend([
|
|
"## 📊 Database Stats",
|
|
f"- **Total alerts:** {stats['total_alerts']}",
|
|
f"- **Last 24h:** {stats['last_24h']}",
|
|
f"- **Unnotified:** {stats['unnotified']}",
|
|
""
|
|
])
|
|
|
|
# Matches
|
|
if data.get("relevant"):
|
|
lines.extend([
|
|
f"## 🎯 Relevant Alerts ({data['relevant_alerts']})",
|
|
""
|
|
])
|
|
|
|
if data.get("critical"):
|
|
lines.append("### ⚠️ Critical")
|
|
for alert in data["critical"][:5]:
|
|
matches = ", ".join(m["name"] for m in alert.get("inventory_matches", []))
|
|
lines.extend([
|
|
f"- **{alert['title'][:80]}**",
|
|
f" - Source: {alert.get('source', 'unknown')}",
|
|
f" - Affects: {matches}",
|
|
""
|
|
])
|
|
|
|
lines.append("### 📋 Other Relevant")
|
|
for alert in data["relevant"][:10]:
|
|
if alert in data.get("critical", []):
|
|
continue
|
|
matches = ", ".join(m["name"] for m in alert.get("inventory_matches", []))
|
|
lines.append(f"- {alert['title'][:60]}... ({matches})")
|
|
|
|
lines.append("")
|
|
|
|
# AI Summary
|
|
if use_llm and data.get("relevant"):
|
|
lines.extend(["## 🤖 AI Summary", ""])
|
|
summary = get_ai_summary(data["relevant"][:10])
|
|
lines.extend([summary, ""])
|
|
|
|
# Actions
|
|
lines.extend([
|
|
"## 📝 Recommended Actions",
|
|
""
|
|
])
|
|
|
|
if data.get("critical"):
|
|
lines.append("1. Review critical alerts and check for available patches")
|
|
|
|
if stats["unnotified"] > 10:
|
|
lines.append(f"2. Process {stats['unnotified']} unnotified alerts")
|
|
|
|
if not data.get("critical") and stats["unnotified"] <= 10:
|
|
lines.append("✅ No immediate actions required")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def get_ai_summary(alerts: list[dict]) -> str:
|
|
"""Get AI summary of alerts."""
|
|
if not alerts:
|
|
return "No alerts to summarize."
|
|
|
|
alert_text = "\n".join([
|
|
f"- [{a.get('severity', 'info').upper()}] {a.get('title', '')}"
|
|
for a in alerts[:15]
|
|
])
|
|
|
|
prompt = f"""Du bist ein Security-Analyst. Fasse diese Security-Alerts kurz zusammen (max 5 Sätze, Deutsch).
|
|
Fokus: Was ist kritisch? Was erfordert Aktion?
|
|
|
|
Alerts:
|
|
{alert_text}
|
|
|
|
Zusammenfassung:"""
|
|
|
|
try:
|
|
response = requests.post(
|
|
llm_url(),
|
|
json={
|
|
"model": llm_model(),
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {"temperature": 0.3, "num_predict": 300}
|
|
},
|
|
timeout=60
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return response.json().get("response", "").strip()
|
|
except Exception as e:
|
|
return f"(LLM nicht erreichbar: {e})"
|
|
|
|
return "(Zusammenfassung nicht verfügbar)"
|
|
|
|
|
|
# --- Commands ---
|
|
|
|
def cmd_scan(include_nmap: bool = False) -> None:
|
|
"""Scan security feeds and update database."""
|
|
init_db()
|
|
|
|
print(f"🛡️ Sentinel Scan — {datetime.now().strftime('%Y-%m-%d %H:%M')}", file=sys.stderr)
|
|
print(f" Fetching {len(FEEDS)} feeds...", file=sys.stderr)
|
|
|
|
all_entries, successful, failed = fetch_all_feeds()
|
|
|
|
print(f"\n Feeds: {successful}/{successful+failed} OK", file=sys.stderr)
|
|
|
|
# Deduplicate via SQLite
|
|
print("\n🔍 Deduplicating...", file=sys.stderr)
|
|
new_count = 0
|
|
dupe_count = 0
|
|
new_entries = []
|
|
|
|
for entry in all_entries:
|
|
if add_alert(entry):
|
|
new_entries.append(entry)
|
|
new_count += 1
|
|
else:
|
|
dupe_count += 1
|
|
|
|
# Log run
|
|
log_run(len(all_entries), new_count, dupe_count, 0)
|
|
|
|
# Stats
|
|
relevant_new = sum(1 for e in new_entries if e.get("relevant"))
|
|
critical_new = sum(1 for e in new_entries if e.get("severity") == "critical")
|
|
|
|
print(f"\n📊 Summary:", file=sys.stderr)
|
|
print(f" Fetched: {len(all_entries)}", file=sys.stderr)
|
|
print(f" New: {new_count} ({relevant_new} relevant, {critical_new} critical)", file=sys.stderr)
|
|
print(f" Duplicates: {dupe_count}", file=sys.stderr)
|
|
|
|
# Save to file
|
|
output = {
|
|
"fetched_at": datetime.now().isoformat(),
|
|
"stats": {
|
|
"total_fetched": len(all_entries),
|
|
"new_alerts": new_count,
|
|
"duplicates": dupe_count,
|
|
"relevant": relevant_new,
|
|
"critical": critical_new
|
|
},
|
|
"entries": new_entries
|
|
}
|
|
|
|
output_file = feeds_dir() / "alerts_latest.json"
|
|
output_file.write_text(json.dumps(output, indent=2))
|
|
print(f" Output: {output_file}", file=sys.stderr)
|
|
|
|
|
|
def cmd_matches() -> None:
|
|
"""Show CVE matches against inventory."""
|
|
alerts = get_recent_alerts(100)
|
|
|
|
if not alerts:
|
|
print("No alerts in database. Run 'cortex sentinel scan' first.")
|
|
return
|
|
|
|
data = analyze_matches(alerts)
|
|
|
|
print(f"🎯 Inventory Matches ({data['relevant_alerts']} of {data['source_alerts']})\n")
|
|
|
|
if data.get("critical"):
|
|
print("⚠️ CRITICAL:\n")
|
|
for alert in data["critical"][:5]:
|
|
matches = ", ".join(m["name"] for m in alert.get("inventory_matches", []))
|
|
print(f" • {alert['title'][:70]}")
|
|
print(f" Affects: {matches}\n")
|
|
|
|
if data.get("relevant"):
|
|
print("\n📋 Other relevant:\n")
|
|
for alert in data["relevant"][:10]:
|
|
if alert in data.get("critical", []):
|
|
continue
|
|
matches = ", ".join(m["name"] for m in alert.get("inventory_matches", []))
|
|
print(f" • {alert['title'][:60]}... ({matches})")
|
|
|
|
if data.get("category_breakdown"):
|
|
print(f"\n📊 By category: {data['category_breakdown']}")
|
|
|
|
# Save
|
|
report_file = reports_dir() / f"match_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
report_file.write_text(json.dumps(data, indent=2))
|
|
|
|
|
|
def cmd_report(use_llm: bool = False) -> None:
|
|
"""Generate security report."""
|
|
alerts = get_recent_alerts(100)
|
|
data = analyze_matches(alerts)
|
|
|
|
report = generate_report(data, use_llm)
|
|
|
|
# Save
|
|
report_file = reports_dir() / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
|
|
report_file.write_text(report)
|
|
|
|
# Symlink latest
|
|
latest = reports_dir() / "report_latest.md"
|
|
if latest.exists() or latest.is_symlink():
|
|
latest.unlink()
|
|
latest.symlink_to(report_file.name)
|
|
|
|
print(f"✅ Report saved: {report_file}", file=sys.stderr)
|
|
print(report)
|
|
|
|
|
|
def cmd_stats() -> None:
|
|
"""Show database statistics."""
|
|
init_db()
|
|
stats = get_stats()
|
|
|
|
print("📊 Sentinel Stats\n")
|
|
print(f"Total alerts: {stats['total_alerts']}")
|
|
print(f"Last 24h: {stats['last_24h']}")
|
|
print(f"Unnotified: {stats['unnotified']}")
|
|
|
|
if stats.get("by_severity"):
|
|
print(f"\nBy severity:")
|
|
for sev, count in stats["by_severity"].items():
|
|
print(f" {sev}: {count}")
|
|
|
|
if stats.get("recent_runs"):
|
|
print(f"\nRecent runs:")
|
|
for run in stats["recent_runs"][:3]:
|
|
ts = run.get("timestamp", "")[:16]
|
|
print(f" {ts} — {run.get('new_alerts', 0)} new, {run.get('duplicates', 0)} dupes")
|
|
|
|
|
|
# --- Main ---
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Security Feed Aggregation and CVE Matching',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog='''
|
|
Commands:
|
|
scan Fetch security feeds and update database
|
|
matches Show alerts matching local inventory
|
|
report Generate markdown security report
|
|
stats Show database statistics
|
|
|
|
Examples:
|
|
cortex sentinel scan
|
|
cortex sentinel matches
|
|
cortex sentinel report --llm
|
|
'''
|
|
)
|
|
|
|
sub = parser.add_subparsers(dest='command')
|
|
|
|
# scan
|
|
scan_p = sub.add_parser('scan', help='Fetch security feeds')
|
|
scan_p.add_argument('--nmap', action='store_true',
|
|
help='Include network scan (slow)')
|
|
|
|
# matches
|
|
sub.add_parser('matches', help='Show inventory matches')
|
|
|
|
# report
|
|
report_p = sub.add_parser('report', help='Generate report')
|
|
report_p.add_argument('--llm', action='store_true',
|
|
help='Include AI summary')
|
|
|
|
# stats
|
|
sub.add_parser('stats', help='Show database stats')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == 'scan':
|
|
cmd_scan(getattr(args, 'nmap', False))
|
|
elif args.command == 'matches':
|
|
cmd_matches()
|
|
elif args.command == 'report':
|
|
cmd_report(getattr(args, 'llm', False))
|
|
elif args.command == 'stats':
|
|
cmd_stats()
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|