Some checks failed
Tests / test (push) Failing after 2s
- Merged all unique darkplex-core modules into cortex: - intelligence/ subfolder (anticipator, collective, shared_memory, knowledge_cleanup, temporal, llm_extractor, loop) - governance/ subfolder (policy engine, risk scorer, evidence, enforcer, report generator) - entity_manager.py, knowledge_extractor.py - Fixed bare 'from intelligence.' imports to 'from cortex.intelligence.' - Added 'darkplex' CLI alias alongside 'cortex' - Package renamed to darkplex-core v0.2.0 - 405 tests passing (was 234) - 14 new test files covering all merged modules
345 lines
11 KiB
Python
Executable file
345 lines
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Smart Extractor — Extract entities from NATS events and update knowledge graph.
|
|
Part of Level 4.4 AGI Roadmap.
|
|
|
|
Usage:
|
|
smart-extractor.py --last 100 — Process last N events
|
|
smart-extractor.py --since 6h — Process events from last 6 hours
|
|
smart-extractor.py --dry-run — Show what would be extracted without saving
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import re
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# Import entity-manager functions
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from importlib import import_module
|
|
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
LOG_DIR = Path.home() / "clawd" / "logs"
|
|
LOG_FILE = LOG_DIR / "entity-extraction.log"
|
|
KNOWLEDGE_DIR = Path.home() / ".cortex" / "knowledge"
|
|
ENTITIES_FILE = KNOWLEDGE_DIR / "entities.json"
|
|
RELATIONSHIPS_FILE = KNOWLEDGE_DIR / "relationships.json"
|
|
NATS_STREAM = "openclaw-events"
|
|
CONSUMER_NAME = "kg-extractor-temp"
|
|
|
|
# Setup logging
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler(),
|
|
],
|
|
)
|
|
log = logging.getLogger("smart-extractor")
|
|
|
|
|
|
def load_json(path):
|
|
try:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return {}
|
|
|
|
|
|
def save_json(path, data):
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, "w") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def importance_heuristic(text):
|
|
"""Simple importance scoring (0-1) based on content heuristics."""
|
|
if not text:
|
|
return 0.0
|
|
|
|
score = 0.3 # base
|
|
|
|
# Boost for substantive content
|
|
if len(text) > 200:
|
|
score += 0.1
|
|
if len(text) > 500:
|
|
score += 0.1
|
|
|
|
# Boost for entity-rich content
|
|
caps = len(re.findall(r"\b[A-Z][a-z]+\b", text))
|
|
if caps > 3:
|
|
score += 0.1
|
|
if caps > 8:
|
|
score += 0.1
|
|
|
|
# Penalize heartbeat/cron noise
|
|
noise_patterns = ["HEARTBEAT_OK", "heartbeat", "cron:", "health check", "no critical"]
|
|
for p in noise_patterns:
|
|
if p.lower() in text.lower():
|
|
score -= 0.3
|
|
|
|
# Boost for business/project content
|
|
boost_words = ["meeting", "project", "company", "contract", "decision",
|
|
"strategy", "budget", "deadline", "milestone", "partnership",
|
|
"investment", "revenue", "client", "proposal", "agreement"]
|
|
for w in boost_words:
|
|
if w in text.lower():
|
|
score += 0.05
|
|
|
|
return max(0.0, min(1.0, score))
|
|
|
|
|
|
def fetch_events_nats(last=None, since=None):
|
|
"""Fetch events from NATS using consumer approach."""
|
|
events = []
|
|
|
|
# Create a temporary pull consumer
|
|
filter_subj = "openclaw.events.main.conversation_message_in"
|
|
|
|
# Use direct stream get instead of consumer (more reliable)
|
|
try:
|
|
# Get stream info for sequence range
|
|
info_result = subprocess.run(
|
|
["nats", "stream", "info", NATS_STREAM, "--json"],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if info_result.returncode != 0:
|
|
log.error("Failed to get stream info")
|
|
return events
|
|
|
|
info = json.loads(info_result.stdout)
|
|
end_seq = info["state"]["last_seq"]
|
|
start_seq = info["state"]["first_seq"]
|
|
|
|
# Calculate range
|
|
count = last or 500
|
|
if since:
|
|
# Estimate start sequence from time
|
|
ms_since = parse_since(since) * 1000
|
|
total_ms = (time.time() * 1000) - (datetime.fromisoformat(info["state"]["first_ts"].replace("Z", "+00:00")).timestamp() * 1000)
|
|
total_msgs = end_seq - start_seq
|
|
msgs_per_ms = total_msgs / total_ms if total_ms > 0 else 1
|
|
fetch_start = max(start_seq, int(end_seq - ms_since * msgs_per_ms * 1.2))
|
|
else:
|
|
fetch_start = max(start_seq, end_seq - count)
|
|
|
|
# Only fetch conversation messages
|
|
log.info(f"Fetching sequences {fetch_start} - {end_seq}")
|
|
step = max(1, (end_seq - fetch_start) // count)
|
|
|
|
for seq in range(fetch_start, end_seq + 1, step):
|
|
try:
|
|
result = subprocess.run(
|
|
["nats", "stream", "get", NATS_STREAM, str(seq), "--json"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if result.returncode != 0:
|
|
continue
|
|
msg = json.loads(result.stdout)
|
|
subj = msg.get("subject", "")
|
|
if "conversation_message_in" not in subj:
|
|
continue
|
|
import base64
|
|
# Input validation: max size check (1MB)
|
|
raw_data = msg.get("data", "")
|
|
if len(raw_data) > 1_048_576:
|
|
log.warning("Skipping oversized message at seq %d (%d bytes)", seq, len(raw_data))
|
|
continue
|
|
try:
|
|
decoded = base64.b64decode(raw_data)
|
|
except Exception as e:
|
|
log.warning("Invalid base64 at seq %d: %s", seq, e)
|
|
continue
|
|
try:
|
|
data = json.loads(decoded.decode("utf-8"))
|
|
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
|
log.warning("Invalid JSON at seq %d: %s", seq, e)
|
|
continue
|
|
if not isinstance(data, dict):
|
|
log.warning("Expected dict at seq %d, got %s", seq, type(data).__name__)
|
|
continue
|
|
events.append(data)
|
|
except Exception:
|
|
continue
|
|
|
|
log.info(f"Fetched {len(events)} conversation events")
|
|
|
|
except subprocess.TimeoutExpired:
|
|
log.warning("NATS command timed out")
|
|
except FileNotFoundError:
|
|
log.warning("nats CLI not found — skipping NATS extraction")
|
|
|
|
# Filter by time if --since specified
|
|
if since and events:
|
|
cutoff = parse_since(since)
|
|
if cutoff:
|
|
events = [e for e in events if e.get("timestamp", 0) / 1000 >= cutoff]
|
|
|
|
return events
|
|
|
|
|
|
def parse_since(since_str):
|
|
"""Parse duration string like '6h', '1d', '30m' to epoch timestamp."""
|
|
m = re.match(r"(\d+)([hdm])", since_str)
|
|
if not m:
|
|
return None
|
|
val, unit = int(m.group(1)), m.group(2)
|
|
seconds = {"h": 3600, "d": 86400, "m": 60}[unit]
|
|
return time.time() - (val * seconds)
|
|
|
|
|
|
def extract_from_event(event, known_entities):
|
|
"""Extract entities from a single event."""
|
|
# Import extract_entities from entity_manager
|
|
em = sys.modules.get("entity_manager_mod")
|
|
if not em:
|
|
# Load entity-manager module
|
|
spec_path = Path(__file__).parent / "entity_manager.py"
|
|
import importlib.util
|
|
spec = importlib.util.spec_from_file_location("entity_manager_mod", spec_path)
|
|
em = importlib.util.module_from_spec(spec)
|
|
sys.modules["entity_manager_mod"] = em
|
|
spec.loader.exec_module(em)
|
|
|
|
payload = event.get("payload", {})
|
|
text = payload.get("text_preview", "") or payload.get("text", "")
|
|
if isinstance(text, list):
|
|
text = " ".join(str(t) for t in text)
|
|
if not isinstance(text, str):
|
|
text = str(text)
|
|
|
|
if not text:
|
|
return {}, 0.0
|
|
|
|
score = importance_heuristic(text)
|
|
if score < 0.4:
|
|
return {}, score
|
|
|
|
found = em.extract_entities(text, known_entities)
|
|
return found, score
|
|
|
|
|
|
def run_extraction(last=None, since=None, dry_run=False):
|
|
"""Main extraction pipeline."""
|
|
log.info(f"Starting extraction (last={last}, since={since}, dry_run={dry_run})")
|
|
|
|
# Load known entities
|
|
spec_path = Path(__file__).parent / "entity_manager.py"
|
|
import importlib.util
|
|
spec = importlib.util.spec_from_file_location("entity_manager_mod", spec_path)
|
|
em = importlib.util.module_from_spec(spec)
|
|
sys.modules["entity_manager_mod"] = em
|
|
spec.loader.exec_module(em)
|
|
|
|
known = em.load_known_entities()
|
|
log.info(f"Loaded {len(known)} known entities")
|
|
|
|
# Fetch events
|
|
events = fetch_events_nats(last=last, since=since)
|
|
log.info(f"Fetched {len(events)} events from NATS")
|
|
|
|
if not events:
|
|
log.info("No events to process")
|
|
return
|
|
|
|
entities = em.load_json(ENTITIES_FILE)
|
|
relationships = em.load_json(RELATIONSHIPS_FILE)
|
|
|
|
total_extracted = 0
|
|
new_entities = 0
|
|
new_relationships = 0
|
|
ts_now = time.strftime("%Y-%m-%dT%H:%M:%S")
|
|
|
|
for event in events:
|
|
found, score = extract_from_event(event, known)
|
|
if not found:
|
|
continue
|
|
|
|
total_extracted += len(found)
|
|
names = list(found.keys())
|
|
|
|
# Add new entities
|
|
for name, info in found.items():
|
|
if name not in entities:
|
|
entities[name] = {
|
|
"type": info["type"],
|
|
"source": "nats-extraction",
|
|
"first_seen": ts_now,
|
|
}
|
|
new_entities += 1
|
|
known[name] = entities[name]
|
|
|
|
# Create co-occurrence relationships between entities found in same message
|
|
if len(names) >= 2:
|
|
for i in range(len(names)):
|
|
for j in range(i + 1, min(len(names), i + 5)): # limit pairs
|
|
a, b = min(names[i], names[j]), max(names[i], names[j])
|
|
key = f"{a}::{b}"
|
|
if key in relationships:
|
|
relationships[key]["count"] = relationships[key].get("count", 1) + 1
|
|
relationships[key]["last_seen"] = ts_now
|
|
else:
|
|
relationships[key] = {
|
|
"a": a, "b": b,
|
|
"types": ["co-occurrence"],
|
|
"count": 1,
|
|
"first_seen": ts_now,
|
|
"last_seen": ts_now,
|
|
}
|
|
new_relationships += 1
|
|
|
|
if not dry_run and total_extracted % 50 == 0 and total_extracted > 0:
|
|
# Periodic save
|
|
em.save_json(ENTITIES_FILE, entities)
|
|
em.save_json(RELATIONSHIPS_FILE, relationships)
|
|
|
|
if not dry_run:
|
|
em.save_json(ENTITIES_FILE, entities)
|
|
em.save_json(RELATIONSHIPS_FILE, relationships)
|
|
|
|
log.info(
|
|
f"Done: {len(events)} events processed, {total_extracted} entities extracted, "
|
|
f"{new_entities} new entities, {new_relationships} new relationships"
|
|
)
|
|
print(
|
|
f"\nResults: {len(events)} events → {total_extracted} entities extracted, "
|
|
f"{new_entities} new, {new_relationships} new relationships"
|
|
)
|
|
|
|
|
|
def main():
|
|
last = None
|
|
since = None
|
|
dry_run = False
|
|
|
|
args = sys.argv[1:]
|
|
i = 0
|
|
while i < len(args):
|
|
if args[i] == "--last" and i + 1 < len(args):
|
|
last = int(args[i + 1])
|
|
i += 2
|
|
elif args[i] == "--since" and i + 1 < len(args):
|
|
since = args[i + 1]
|
|
i += 2
|
|
elif args[i] == "--dry-run":
|
|
dry_run = True
|
|
i += 1
|
|
else:
|
|
print(__doc__)
|
|
sys.exit(1)
|
|
|
|
if last is None and since is None:
|
|
last = 100 # default
|
|
|
|
run_extraction(last=last, since=since, dry_run=dry_run)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|