darkplex-core/cortex/knowledge_extractor.py
Claudia fd7d75c0ed
Some checks failed
Tests / test (push) Failing after 2s
Merge darkplex-core into cortex — unified intelligence layer v0.2.0
- Merged all unique darkplex-core modules into cortex:
  - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop)
  - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator)
  - entity_manager.py, knowledge_extractor.py
- Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.'
- Added 'darkplex' CLI alias alongside 'cortex'
- Package renamed to darkplex-core v0.2.0
- 405 tests passing (was 234)
- 14 new test files covering all merged modules
2026-02-12 08:43:02 +01:00

345 lines
11 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Smart Extractor — Extract entities from NATS events and update knowledge graph.
Part of Level 4.4 AGI Roadmap.
Usage:
smart-extractor.py --last 100 — Process last N events
smart-extractor.py --since 6h — Process events from last 6 hours
smart-extractor.py --dry-run — Show what would be extracted without saving
"""
import sys
import os
import json
import subprocess
import re
import time
import logging
from pathlib import Path
from datetime import datetime
# Import entity-manager functions
sys.path.insert(0, str(Path(__file__).parent))
from importlib import import_module
SCRIPT_DIR = Path(__file__).parent
LOG_DIR = Path.home() / "clawd" / "logs"
LOG_FILE = LOG_DIR / "entity-extraction.log"
KNOWLEDGE_DIR = Path.home() / ".cortex" / "knowledge"
ENTITIES_FILE = KNOWLEDGE_DIR / "entities.json"
RELATIONSHIPS_FILE = KNOWLEDGE_DIR / "relationships.json"
NATS_STREAM = "openclaw-events"
CONSUMER_NAME = "kg-extractor-temp"
# Setup logging
LOG_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(),
],
)
log = logging.getLogger("smart-extractor")
def load_json(path):
try:
with open(path) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_json(path, data):
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def importance_heuristic(text):
"""Simple importance scoring (0-1) based on content heuristics."""
if not text:
return 0.0
score = 0.3 # base
# Boost for substantive content
if len(text) > 200:
score += 0.1
if len(text) > 500:
score += 0.1
# Boost for entity-rich content
caps = len(re.findall(r"\b[A-Z][a-z]+\b", text))
if caps > 3:
score += 0.1
if caps > 8:
score += 0.1
# Penalize heartbeat/cron noise
noise_patterns = ["HEARTBEAT_OK", "heartbeat", "cron:", "health check", "no critical"]
for p in noise_patterns:
if p.lower() in text.lower():
score -= 0.3
# Boost for business/project content
boost_words = ["meeting", "project", "company", "contract", "decision",
"strategy", "budget", "deadline", "milestone", "partnership",
"investment", "revenue", "client", "proposal", "agreement"]
for w in boost_words:
if w in text.lower():
score += 0.05
return max(0.0, min(1.0, score))
def fetch_events_nats(last=None, since=None):
"""Fetch events from NATS using consumer approach."""
events = []
# Create a temporary pull consumer
filter_subj = "openclaw.events.main.conversation_message_in"
# Use direct stream get instead of consumer (more reliable)
try:
# Get stream info for sequence range
info_result = subprocess.run(
["nats", "stream", "info", NATS_STREAM, "--json"],
capture_output=True, text=True, timeout=10
)
if info_result.returncode != 0:
log.error("Failed to get stream info")
return events
info = json.loads(info_result.stdout)
end_seq = info["state"]["last_seq"]
start_seq = info["state"]["first_seq"]
# Calculate range
count = last or 500
if since:
# Estimate start sequence from time
ms_since = parse_since(since) * 1000
total_ms = (time.time() * 1000) - (datetime.fromisoformat(info["state"]["first_ts"].replace("Z", "+00:00")).timestamp() * 1000)
total_msgs = end_seq - start_seq
msgs_per_ms = total_msgs / total_ms if total_ms > 0 else 1
fetch_start = max(start_seq, int(end_seq - ms_since * msgs_per_ms * 1.2))
else:
fetch_start = max(start_seq, end_seq - count)
# Only fetch conversation messages
log.info(f"Fetching sequences {fetch_start} - {end_seq}")
step = max(1, (end_seq - fetch_start) // count)
for seq in range(fetch_start, end_seq + 1, step):
try:
result = subprocess.run(
["nats", "stream", "get", NATS_STREAM, str(seq), "--json"],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
continue
msg = json.loads(result.stdout)
subj = msg.get("subject", "")
if "conversation_message_in" not in subj:
continue
import base64
# Input validation: max size check (1MB)
raw_data = msg.get("data", "")
if len(raw_data) > 1_048_576:
log.warning("Skipping oversized message at seq %d (%d bytes)", seq, len(raw_data))
continue
try:
decoded = base64.b64decode(raw_data)
except Exception as e:
log.warning("Invalid base64 at seq %d: %s", seq, e)
continue
try:
data = json.loads(decoded.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
log.warning("Invalid JSON at seq %d: %s", seq, e)
continue
if not isinstance(data, dict):
log.warning("Expected dict at seq %d, got %s", seq, type(data).__name__)
continue
events.append(data)
except Exception:
continue
log.info(f"Fetched {len(events)} conversation events")
except subprocess.TimeoutExpired:
log.warning("NATS command timed out")
except FileNotFoundError:
log.warning("nats CLI not found — skipping NATS extraction")
# Filter by time if --since specified
if since and events:
cutoff = parse_since(since)
if cutoff:
events = [e for e in events if e.get("timestamp", 0) / 1000 >= cutoff]
return events
def parse_since(since_str):
"""Parse duration string like '6h', '1d', '30m' to epoch timestamp."""
m = re.match(r"(\d+)([hdm])", since_str)
if not m:
return None
val, unit = int(m.group(1)), m.group(2)
seconds = {"h": 3600, "d": 86400, "m": 60}[unit]
return time.time() - (val * seconds)
def extract_from_event(event, known_entities):
"""Extract entities from a single event."""
# Import extract_entities from entity_manager
em = sys.modules.get("entity_manager_mod")
if not em:
# Load entity-manager module
spec_path = Path(__file__).parent / "entity_manager.py"
import importlib.util
spec = importlib.util.spec_from_file_location("entity_manager_mod", spec_path)
em = importlib.util.module_from_spec(spec)
sys.modules["entity_manager_mod"] = em
spec.loader.exec_module(em)
payload = event.get("payload", {})
text = payload.get("text_preview", "") or payload.get("text", "")
if isinstance(text, list):
text = " ".join(str(t) for t in text)
if not isinstance(text, str):
text = str(text)
if not text:
return {}, 0.0
score = importance_heuristic(text)
if score < 0.4:
return {}, score
found = em.extract_entities(text, known_entities)
return found, score
def run_extraction(last=None, since=None, dry_run=False):
"""Main extraction pipeline."""
log.info(f"Starting extraction (last={last}, since={since}, dry_run={dry_run})")
# Load known entities
spec_path = Path(__file__).parent / "entity_manager.py"
import importlib.util
spec = importlib.util.spec_from_file_location("entity_manager_mod", spec_path)
em = importlib.util.module_from_spec(spec)
sys.modules["entity_manager_mod"] = em
spec.loader.exec_module(em)
known = em.load_known_entities()
log.info(f"Loaded {len(known)} known entities")
# Fetch events
events = fetch_events_nats(last=last, since=since)
log.info(f"Fetched {len(events)} events from NATS")
if not events:
log.info("No events to process")
return
entities = em.load_json(ENTITIES_FILE)
relationships = em.load_json(RELATIONSHIPS_FILE)
total_extracted = 0
new_entities = 0
new_relationships = 0
ts_now = time.strftime("%Y-%m-%dT%H:%M:%S")
for event in events:
found, score = extract_from_event(event, known)
if not found:
continue
total_extracted += len(found)
names = list(found.keys())
# Add new entities
for name, info in found.items():
if name not in entities:
entities[name] = {
"type": info["type"],
"source": "nats-extraction",
"first_seen": ts_now,
}
new_entities += 1
known[name] = entities[name]
# Create co-occurrence relationships between entities found in same message
if len(names) >= 2:
for i in range(len(names)):
for j in range(i + 1, min(len(names), i + 5)): # limit pairs
a, b = min(names[i], names[j]), max(names[i], names[j])
key = f"{a}::{b}"
if key in relationships:
relationships[key]["count"] = relationships[key].get("count", 1) + 1
relationships[key]["last_seen"] = ts_now
else:
relationships[key] = {
"a": a, "b": b,
"types": ["co-occurrence"],
"count": 1,
"first_seen": ts_now,
"last_seen": ts_now,
}
new_relationships += 1
if not dry_run and total_extracted % 50 == 0 and total_extracted > 0:
# Periodic save
em.save_json(ENTITIES_FILE, entities)
em.save_json(RELATIONSHIPS_FILE, relationships)
if not dry_run:
em.save_json(ENTITIES_FILE, entities)
em.save_json(RELATIONSHIPS_FILE, relationships)
log.info(
f"Done: {len(events)} events processed, {total_extracted} entities extracted, "
f"{new_entities} new entities, {new_relationships} new relationships"
)
print(
f"\nResults: {len(events)} events → {total_extracted} entities extracted, "
f"{new_entities} new, {new_relationships} new relationships"
)
def main():
last = None
since = None
dry_run = False
args = sys.argv[1:]
i = 0
while i < len(args):
if args[i] == "--last" and i + 1 < len(args):
last = int(args[i + 1])
i += 2
elif args[i] == "--since" and i + 1 < len(args):
since = args[i + 1]
i += 2
elif args[i] == "--dry-run":
dry_run = True
i += 1
else:
print(__doc__)
sys.exit(1)
if last is None and since is None:
last = 100 # default
run_extraction(last=last, since=since, dry_run=dry_run)
if __name__ == "__main__":
main()