#!/usr/bin/env python3 """Cortex Sentinel — Security Feed Aggregation and CVE Matching. Consolidated from ~/clawd/scripts/sentinel/ (rss-fetch.py, db.py, cve-match.py, report-gen.py) Features: - RSS security feed aggregation - SQLite-based deduplication - CVE matching against local inventory - Report generation (markdown + AI summary) Usage: cortex sentinel scan [--nmap] cortex sentinel report [--llm] cortex sentinel matches cortex sentinel stats """ import argparse import hashlib import json import os import re import sqlite3 import sys from datetime import datetime, timedelta from pathlib import Path from typing import Any, Optional import requests from cortex.config import cortex_home # Try to import feedparser (optional dependency) try: import feedparser HAS_FEEDPARSER = True except ImportError: HAS_FEEDPARSER = False # Disable SSL warnings for problematic feeds try: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) except ImportError: pass # --- Configuration --- def _env(key: str, default: str = '') -> str: return os.environ.get(key, default) def sentinel_dir() -> Path: """Base directory for sentinel data.""" return cortex_home() / "sentinel" def sentinel_db_path() -> Path: """Path to sentinel SQLite database.""" return sentinel_dir() / "sentinel.db" def feeds_dir() -> Path: """Directory for feed output files.""" d = sentinel_dir() / "feeds" d.mkdir(parents=True, exist_ok=True) return d def reports_dir() -> Path: """Directory for report output files.""" d = sentinel_dir() / "reports" d.mkdir(parents=True, exist_ok=True) return d def llm_url() -> str: """LLM API URL for AI summaries.""" return _env('CORTEX_LLM_URL', 'http://localhost:11434/api/generate') def llm_model() -> str: """LLM model to use.""" return _env('CORTEX_LLM_MODEL', 'mistral:7b') # User agent for HTTP requests USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0" # --- Feed Configuration --- FEEDS = { # Security News "bleepingcomputer": { "url": "https://www.bleepingcomputer.com/feed/", "category": "security-news" }, "hackernews": { "url": "https://feeds.feedburner.com/TheHackersNews", "category": "security-news" }, "darkreading": { "url": "https://www.darkreading.com/rss.xml", "category": "security-news" }, "schneier": { "url": "https://www.schneier.com/feed/atom/", "category": "security-news" }, "securityweek": { "url": "https://www.securityweek.com/feed/", "category": "security-news" }, # CVE/Vulnerability Feeds "nvd-recent": { "url": "https://nvd.nist.gov/feeds/xml/cve/misc/nvd-rss.xml", "category": "cve", "verify_ssl": False }, "cisa-alerts": { "url": "https://www.cisa.gov/cybersecurity-advisories/all.xml", "category": "cve", "verify_ssl": False }, # AI/ML Security "huggingface-blog": { "url": "https://huggingface.co/blog/feed.xml", "category": "ai-security" }, "google-ai-blog": { "url": "https://blog.google/technology/ai/rss/", "category": "ai-security" }, # Exploit Databases "exploitdb": { "url": "https://www.exploit-db.com/rss.xml", "category": "exploits", "verify_ssl": False }, } # Keywords that indicate relevance to our infrastructure RELEVANT_KEYWORDS = [ # Tech stack "linux", "debian", "nginx", "traefik", "docker", "postgresql", "redis", "node.js", "nodejs", "python", "openssh", "git", "chromium", "openssl", "ollama", "llm", "whisper", "matrix", "synapse", "element", # Hardware "amd", "radeon", "rocm", "fritzbox", "avm", # Critical issues "critical", "rce", "remote code execution", "zero-day", "0-day", "ransomware", "supply chain", "authentication bypass", # AI-specific "prompt injection", "jailbreak", "model extraction", "adversarial", "llm vulnerability", "ai safety", "model poisoning" ] # Software inventory for CVE matching INVENTORY = { "operating_systems": [ {"name": "Debian", "version": "12", "aliases": ["debian", "bookworm"]}, {"name": "Linux Kernel", "version": "6.1", "aliases": ["linux", "kernel"]}, ], "services": [ {"name": "OpenSSH", "version": "9.2", "aliases": ["ssh", "openssh", "sshd"]}, {"name": "Nginx", "version": "1.22", "aliases": ["nginx"]}, {"name": "Traefik", "version": "2.10", "aliases": ["traefik"]}, {"name": "Docker", "version": "24", "aliases": ["docker", "containerd"]}, {"name": "Node.js", "version": "22", "aliases": ["node", "nodejs", "npm"]}, {"name": "Python", "version": "3.11", "aliases": ["python", "python3"]}, {"name": "PostgreSQL", "version": "15", "aliases": ["postgres", "postgresql"]}, {"name": "Redis", "version": "7", "aliases": ["redis"]}, {"name": "Ollama", "version": "0.1", "aliases": ["ollama", "llama"]}, ], "applications": [ {"name": "Chromium", "version": "120", "aliases": ["chromium", "chrome"]}, {"name": "Git", "version": "2.39", "aliases": ["git"]}, {"name": "OpenSSL", "version": "3.0", "aliases": ["openssl", "ssl", "tls"]}, ], "hardware": [ {"name": "AMD Radeon RX 5700 XT", "aliases": ["amd", "radeon", "rx5700", "navi", "gfx1010"]}, {"name": "Fritz!Box", "aliases": ["fritzbox", "fritz", "avm"]}, ] } # --- Database --- def get_db() -> sqlite3.Connection: """Get database connection with row factory.""" sentinel_dir().mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(sentinel_db_path()) conn.row_factory = sqlite3.Row return conn def init_db() -> None: """Initialize database schema.""" conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS alerts ( id TEXT PRIMARY KEY, source TEXT NOT NULL, category TEXT, title TEXT NOT NULL, link TEXT, summary TEXT, severity TEXT DEFAULT 'info', relevant INTEGER DEFAULT 0, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, seen_count INTEGER DEFAULT 1, notified INTEGER DEFAULT 0, acknowledged INTEGER DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_alerts_source ON alerts(source); CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity); CREATE INDEX IF NOT EXISTS idx_alerts_first_seen ON alerts(first_seen); CREATE INDEX IF NOT EXISTS idx_alerts_notified ON alerts(notified); CREATE TABLE IF NOT EXISTS runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, total_fetched INTEGER, new_alerts INTEGER, duplicates INTEGER, notified INTEGER ); """) conn.commit() conn.close() def add_alert(alert: dict) -> bool: """Add alert if new, update if exists. Returns True if new.""" conn = get_db() now = datetime.now().isoformat() cur = conn.execute("SELECT id, seen_count FROM alerts WHERE id = ?", (alert["id"],)) existing = cur.fetchone() if existing: conn.execute(""" UPDATE alerts SET last_seen = ?, seen_count = seen_count + 1 WHERE id = ? """, (now, alert["id"])) conn.commit() conn.close() return False else: conn.execute(""" INSERT INTO alerts (id, source, category, title, link, summary, severity, relevant, first_seen, last_seen) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( alert["id"], alert.get("source", "unknown"), alert.get("category", ""), alert.get("title", "")[:500], alert.get("link", ""), alert.get("summary", "")[:1000], alert.get("severity", "info"), 1 if alert.get("relevant") else 0, now, now )) conn.commit() conn.close() return True def log_run(total: int, new: int, dupes: int, notified: int = 0) -> None: """Log a sentinel run.""" conn = get_db() conn.execute(""" INSERT INTO runs (timestamp, total_fetched, new_alerts, duplicates, notified) VALUES (?, ?, ?, ?, ?) """, (datetime.now().isoformat(), total, new, dupes, notified)) conn.commit() conn.close() def get_stats() -> dict: """Get database statistics.""" conn = get_db() stats = {} cur = conn.execute("SELECT COUNT(*) FROM alerts") stats["total_alerts"] = cur.fetchone()[0] cur = conn.execute(""" SELECT severity, COUNT(*) as count FROM alerts GROUP BY severity ORDER BY count DESC """) stats["by_severity"] = {row["severity"]: row["count"] for row in cur.fetchall()} cur = conn.execute("SELECT COUNT(*) FROM alerts WHERE notified = 0 AND relevant = 1") stats["unnotified"] = cur.fetchone()[0] yesterday = (datetime.now() - timedelta(days=1)).isoformat() cur = conn.execute("SELECT COUNT(*) FROM alerts WHERE first_seen > ?", (yesterday,)) stats["last_24h"] = cur.fetchone()[0] cur = conn.execute("SELECT * FROM runs ORDER BY timestamp DESC LIMIT 5") stats["recent_runs"] = [dict(row) for row in cur.fetchall()] conn.close() return stats def get_unnotified_alerts(min_severity: str = "medium") -> list[dict]: """Get alerts that haven't been notified yet.""" severity_order = {"critical": 1, "high": 2, "medium": 3, "info": 4} min_level = severity_order.get(min_severity, 3) conn = get_db() cur = conn.execute(""" SELECT * FROM alerts WHERE notified = 0 AND relevant = 1 ORDER BY CASE severity WHEN 'critical' THEN 1 WHEN 'high' THEN 2 WHEN 'medium' THEN 3 ELSE 4 END, first_seen DESC LIMIT 20 """) alerts = [dict(row) for row in cur.fetchall()] conn.close() return [a for a in alerts if severity_order.get(a["severity"], 4) <= min_level] def get_recent_alerts(limit: int = 50) -> list[dict]: """Get recent alerts from database.""" conn = get_db() cur = conn.execute(""" SELECT * FROM alerts ORDER BY first_seen DESC LIMIT ? """, (limit,)) alerts = [dict(row) for row in cur.fetchall()] conn.close() return alerts # --- Feed Fetching --- def fetch_feed(name: str, config: dict) -> list[dict]: """Fetch a single RSS feed.""" if not HAS_FEEDPARSER: print(f" ⚠️ feedparser not installed", file=sys.stderr) return [] url = config["url"] verify_ssl = config.get("verify_ssl", True) try: headers = {"User-Agent": USER_AGENT} response = requests.get(url, headers=headers, timeout=15, verify=verify_ssl) response.raise_for_status() feed = feedparser.parse(response.content) if feed.bozo and not feed.entries: print(f" ⚠️ {name}: Parse error", file=sys.stderr) return [] entries = [] for entry in feed.entries[:20]: # Max 20 per feed title = entry.get("title", "No title") link = entry.get("link", "") summary = entry.get("summary", entry.get("description", ""))[:500] published = entry.get("published", entry.get("updated", "")) # Check relevance text_check = f"{title} {summary}".lower() is_relevant = any(kw in text_check for kw in RELEVANT_KEYWORDS) # Determine severity severity = "info" if any(kw in text_check for kw in ["critical", "rce", "zero-day", "0-day", "ransomware"]): severity = "critical" elif any(kw in text_check for kw in ["high", "remote", "exploit", "vulnerability"]): severity = "high" elif any(kw in text_check for kw in ["medium", "moderate", "security"]): severity = "medium" entries.append({ "id": hashlib.md5(f"{name}:{link}".encode()).hexdigest()[:12], "source": name, "category": config["category"], "title": title, "link": link, "summary": summary[:300], "published": published, "severity": severity, "relevant": is_relevant, "fetched_at": datetime.now().isoformat() }) print(f" ✅ {name}: {len(entries)} entries", file=sys.stderr) return entries except requests.exceptions.SSLError: if verify_ssl: config["verify_ssl"] = False return fetch_feed(name, config) return [] except requests.exceptions.Timeout: print(f" ❌ {name}: Timeout", file=sys.stderr) return [] except requests.exceptions.RequestException as e: print(f" ❌ {name}: {type(e).__name__}", file=sys.stderr) return [] except Exception as e: print(f" ❌ {name}: {e}", file=sys.stderr) return [] def fetch_all_feeds() -> tuple[list[dict], int, int]: """Fetch all configured feeds. Returns (entries, successful, failed).""" all_entries = [] successful = 0 failed = 0 for name, config in FEEDS.items(): entries = fetch_feed(name, config.copy()) if entries: all_entries.extend(entries) successful += 1 else: failed += 1 return all_entries, successful, failed # --- CVE Matching --- def check_inventory_match(text: str) -> list[dict]: """Check if text mentions any inventory items.""" text_lower = text.lower() matches = [] for category, items in INVENTORY.items(): for item in items: for alias in item.get("aliases", []): if alias in text_lower: matches.append({ "category": category, "name": item["name"], "version": item.get("version"), "matched_alias": alias }) break return matches def analyze_matches(alerts: list[dict]) -> dict: """Analyze alerts for inventory matches.""" relevant = [] critical = [] category_counts = {} for alert in alerts: text = f"{alert.get('title', '')} {alert.get('summary', '')}" matches = check_inventory_match(text) if matches: alert["inventory_matches"] = matches alert["match_count"] = len(matches) relevant.append(alert) if alert.get("severity") == "critical": critical.append(alert) for match in matches: cat = match["category"] category_counts[cat] = category_counts.get(cat, 0) + 1 relevant.sort(key=lambda x: (-x.get("match_count", 0), x.get("severity", "info"))) return { "analysis_time": datetime.now().isoformat(), "source_alerts": len(alerts), "relevant_alerts": len(relevant), "critical_relevant": len(critical), "category_breakdown": category_counts, "critical": critical[:10], "relevant": relevant[:20], } # --- Report Generation --- def generate_report(data: dict, use_llm: bool = False) -> str: """Generate markdown security report.""" now = datetime.now() lines = [ "# 🔒 Security Sentinel Report", f"**Generated:** {now.strftime('%Y-%m-%d %H:%M')}", "" ] # Stats stats = get_stats() lines.extend([ "## 📊 Database Stats", f"- **Total alerts:** {stats['total_alerts']}", f"- **Last 24h:** {stats['last_24h']}", f"- **Unnotified:** {stats['unnotified']}", "" ]) # Matches if data.get("relevant"): lines.extend([ f"## 🎯 Relevant Alerts ({data['relevant_alerts']})", "" ]) if data.get("critical"): lines.append("### ⚠️ Critical") for alert in data["critical"][:5]: matches = ", ".join(m["name"] for m in alert.get("inventory_matches", [])) lines.extend([ f"- **{alert['title'][:80]}**", f" - Source: {alert.get('source', 'unknown')}", f" - Affects: {matches}", "" ]) lines.append("### 📋 Other Relevant") for alert in data["relevant"][:10]: if alert in data.get("critical", []): continue matches = ", ".join(m["name"] for m in alert.get("inventory_matches", [])) lines.append(f"- {alert['title'][:60]}... ({matches})") lines.append("") # AI Summary if use_llm and data.get("relevant"): lines.extend(["## 🤖 AI Summary", ""]) summary = get_ai_summary(data["relevant"][:10]) lines.extend([summary, ""]) # Actions lines.extend([ "## 📝 Recommended Actions", "" ]) if data.get("critical"): lines.append("1. Review critical alerts and check for available patches") if stats["unnotified"] > 10: lines.append(f"2. Process {stats['unnotified']} unnotified alerts") if not data.get("critical") and stats["unnotified"] <= 10: lines.append("✅ No immediate actions required") return "\n".join(lines) def get_ai_summary(alerts: list[dict]) -> str: """Get AI summary of alerts.""" if not alerts: return "No alerts to summarize." alert_text = "\n".join([ f"- [{a.get('severity', 'info').upper()}] {a.get('title', '')}" for a in alerts[:15] ]) prompt = f"""Du bist ein Security-Analyst. Fasse diese Security-Alerts kurz zusammen (max 5 Sätze, Deutsch). Fokus: Was ist kritisch? Was erfordert Aktion? Alerts: {alert_text} Zusammenfassung:""" try: response = requests.post( llm_url(), json={ "model": llm_model(), "prompt": prompt, "stream": False, "options": {"temperature": 0.3, "num_predict": 300} }, timeout=60 ) if response.status_code == 200: return response.json().get("response", "").strip() except Exception as e: return f"(LLM nicht erreichbar: {e})" return "(Zusammenfassung nicht verfügbar)" # --- Commands --- def cmd_scan(include_nmap: bool = False) -> None: """Scan security feeds and update database.""" init_db() print(f"🛡️ Sentinel Scan — {datetime.now().strftime('%Y-%m-%d %H:%M')}", file=sys.stderr) print(f" Fetching {len(FEEDS)} feeds...", file=sys.stderr) all_entries, successful, failed = fetch_all_feeds() print(f"\n Feeds: {successful}/{successful+failed} OK", file=sys.stderr) # Deduplicate via SQLite print("\n🔍 Deduplicating...", file=sys.stderr) new_count = 0 dupe_count = 0 new_entries = [] for entry in all_entries: if add_alert(entry): new_entries.append(entry) new_count += 1 else: dupe_count += 1 # Log run log_run(len(all_entries), new_count, dupe_count, 0) # Stats relevant_new = sum(1 for e in new_entries if e.get("relevant")) critical_new = sum(1 for e in new_entries if e.get("severity") == "critical") print(f"\n📊 Summary:", file=sys.stderr) print(f" Fetched: {len(all_entries)}", file=sys.stderr) print(f" New: {new_count} ({relevant_new} relevant, {critical_new} critical)", file=sys.stderr) print(f" Duplicates: {dupe_count}", file=sys.stderr) # Save to file output = { "fetched_at": datetime.now().isoformat(), "stats": { "total_fetched": len(all_entries), "new_alerts": new_count, "duplicates": dupe_count, "relevant": relevant_new, "critical": critical_new }, "entries": new_entries } output_file = feeds_dir() / "alerts_latest.json" output_file.write_text(json.dumps(output, indent=2)) print(f" Output: {output_file}", file=sys.stderr) def cmd_matches() -> None: """Show CVE matches against inventory.""" alerts = get_recent_alerts(100) if not alerts: print("No alerts in database. Run 'cortex sentinel scan' first.") return data = analyze_matches(alerts) print(f"🎯 Inventory Matches ({data['relevant_alerts']} of {data['source_alerts']})\n") if data.get("critical"): print("⚠️ CRITICAL:\n") for alert in data["critical"][:5]: matches = ", ".join(m["name"] for m in alert.get("inventory_matches", [])) print(f" • {alert['title'][:70]}") print(f" Affects: {matches}\n") if data.get("relevant"): print("\n📋 Other relevant:\n") for alert in data["relevant"][:10]: if alert in data.get("critical", []): continue matches = ", ".join(m["name"] for m in alert.get("inventory_matches", [])) print(f" • {alert['title'][:60]}... ({matches})") if data.get("category_breakdown"): print(f"\n📊 By category: {data['category_breakdown']}") # Save report_file = reports_dir() / f"match_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" report_file.write_text(json.dumps(data, indent=2)) def cmd_report(use_llm: bool = False) -> None: """Generate security report.""" alerts = get_recent_alerts(100) data = analyze_matches(alerts) report = generate_report(data, use_llm) # Save report_file = reports_dir() / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" report_file.write_text(report) # Symlink latest latest = reports_dir() / "report_latest.md" if latest.exists() or latest.is_symlink(): latest.unlink() latest.symlink_to(report_file.name) print(f"✅ Report saved: {report_file}", file=sys.stderr) print(report) def cmd_stats() -> None: """Show database statistics.""" init_db() stats = get_stats() print("📊 Sentinel Stats\n") print(f"Total alerts: {stats['total_alerts']}") print(f"Last 24h: {stats['last_24h']}") print(f"Unnotified: {stats['unnotified']}") if stats.get("by_severity"): print(f"\nBy severity:") for sev, count in stats["by_severity"].items(): print(f" {sev}: {count}") if stats.get("recent_runs"): print(f"\nRecent runs:") for run in stats["recent_runs"][:3]: ts = run.get("timestamp", "")[:16] print(f" {ts} — {run.get('new_alerts', 0)} new, {run.get('duplicates', 0)} dupes") # --- Main --- def main(): parser = argparse.ArgumentParser( description='Security Feed Aggregation and CVE Matching', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Commands: scan Fetch security feeds and update database matches Show alerts matching local inventory report Generate markdown security report stats Show database statistics Examples: cortex sentinel scan cortex sentinel matches cortex sentinel report --llm ''' ) sub = parser.add_subparsers(dest='command') # scan scan_p = sub.add_parser('scan', help='Fetch security feeds') scan_p.add_argument('--nmap', action='store_true', help='Include network scan (slow)') # matches sub.add_parser('matches', help='Show inventory matches') # report report_p = sub.add_parser('report', help='Generate report') report_p.add_argument('--llm', action='store_true', help='Include AI summary') # stats sub.add_parser('stats', help='Show database stats') args = parser.parse_args() if args.command == 'scan': cmd_scan(getattr(args, 'nmap', False)) elif args.command == 'matches': cmd_matches() elif args.command == 'report': cmd_report(getattr(args, 'llm', False)) elif args.command == 'stats': cmd_stats() else: parser.print_help() if __name__ == '__main__': main()