darkplex-core/cortex/tracker.py
Claudia 0123ec7090
All checks were successful
Tests / test (push) Successful in 3s
fix: format specifier crash when stream_info is None
2026-02-09 12:51:56 +01:00

617 lines
19 KiB
Python

#!/usr/bin/env python3
"""Cortex Tracker — Commitment and Contradiction Detection.
Ported from ~/clawd/scripts/brain-tracker.mjs
Uses an LLM (configurable via CORTEX_LLM_URL) to analyze messages for:
- Commitments/promises (someone agreeing to do something)
- Factual claims (verifiable statements)
- Contradictions between old and new claims
Usage:
cortex track scan --since 24h
cortex track list
cortex track done <id>
cortex track check "statement"
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import requests
from cortex.config import cortex_home, memory_dir
# --- Configuration ---
def _env(key: str, default: str = '') -> str:
return os.environ.get(key, default)
def llm_url() -> str:
"""LLM API URL (OpenAI-compatible)."""
return _env('CORTEX_LLM_URL', 'http://localhost:11434/api/generate')
def llm_model() -> str:
"""LLM model to use."""
return _env('CORTEX_LLM_MODEL', 'mistral:7b')
def tracker_db_path() -> Path:
"""Path to brain-tracker.json database."""
return memory_dir() / 'brain-tracker.json'
def get_nats_bin() -> str:
"""Find nats binary."""
for path in [
os.path.expanduser('~/bin/nats'),
'/usr/local/bin/nats',
'/usr/bin/nats',
]:
if os.path.exists(path):
return path
return 'nats'
# --- Database ---
def load_db() -> dict:
"""Load tracker database."""
path = tracker_db_path()
if path.exists():
data = json.loads(path.read_text())
# Convert processedIds list back to set
data['processedIds'] = set(data.get('processedIds', []))
return data
return {
'commitments': [],
'claims': [],
'contradictions': [],
'processedIds': set(),
'lastRun': None,
}
def save_db(db: dict) -> None:
"""Save tracker database."""
path = tracker_db_path()
path.parent.mkdir(parents=True, exist_ok=True)
# Convert set to list for JSON serialization
to_save = {
**db,
'processedIds': list(db['processedIds']),
}
path.write_text(json.dumps(to_save, indent=2, ensure_ascii=False))
# --- LLM Integration ---
def query_llm(prompt: str, timeout: int = 25) -> str:
"""Query the LLM with a prompt."""
url = llm_url()
model = llm_model()
# Detect API type from URL
if '/api/generate' in url:
# Ollama API
try:
response = requests.post(
url,
json={
'model': model,
'prompt': prompt,
'stream': False,
'options': {
'temperature': 0.3,
'num_predict': 500,
}
},
timeout=timeout
)
if response.status_code == 200:
return response.json().get('response', '')
except Exception as e:
print(f"⚠️ LLM error: {e}", file=sys.stderr)
return ''
else:
# OpenAI-compatible API
try:
response = requests.post(
url,
headers={
'Authorization': f"Bearer {_env('CORTEX_LLM_API_KEY', '')}",
'Content-Type': 'application/json',
},
json={
'model': model,
'messages': [{'role': 'user', 'content': prompt}],
'temperature': 0.3,
'max_tokens': 500,
},
timeout=timeout
)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
except Exception as e:
print(f"⚠️ LLM error: {e}", file=sys.stderr)
return ''
return ''
def query_ollama_cli(prompt: str, timeout: int = 25) -> str:
"""Query Ollama via CLI as fallback."""
try:
result = subprocess.run(
['ollama', 'run', llm_model()],
input=prompt,
capture_output=True,
text=True,
timeout=timeout
)
return result.stdout or ''
except Exception:
return ''
# --- Event Fetching ---
def fetch_nats_events(subject: str, count: int = 100) -> list[dict]:
"""Fetch events from NATS for a given subject."""
nats_bin = get_nats_bin()
events = []
# Get last message to find sequence
try:
result = subprocess.run(
[nats_bin, 'stream', 'get', 'openclaw-events', f'--last-for={subject}'],
capture_output=True, text=True, timeout=5
)
match = re.search(r'openclaw-events#(\d+)', result.stdout or '')
if not match:
return []
last_seq = int(match.group(1))
start_seq = max(1, last_seq - count)
# Create ephemeral consumer
consumer_name = f'brain-{int(datetime.now().timestamp())}'
subprocess.run(
[nats_bin, 'consumer', 'rm', 'openclaw-events', consumer_name, '--force'],
capture_output=True, timeout=3
)
subprocess.run(
[nats_bin, 'consumer', 'add', 'openclaw-events', consumer_name,
'--pull', f'--deliver={start_seq}', '--ack=none',
f'--filter={subject}', '--defaults'],
capture_output=True, timeout=5
)
result = subprocess.run(
[nats_bin, 'consumer', 'next', 'openclaw-events', consumer_name,
'--count', str(count), '--raw'],
capture_output=True, text=True, timeout=15
)
subprocess.run(
[nats_bin, 'consumer', 'rm', 'openclaw-events', consumer_name, '--force'],
capture_output=True, timeout=3
)
for line in (result.stdout or '').split('\n'):
line = line.strip()
if not line or not line.startswith('{'):
continue
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
except Exception as e:
print(f"⚠️ NATS error: {e}", file=sys.stderr)
return events
def load_jsonl_events(source: str, hours: int = 24) -> list[dict]:
"""Load events from JSONL file."""
events = []
cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)
if source == '-':
lines = sys.stdin
else:
path = Path(source)
if not path.exists():
return []
lines = path.open()
for line in lines:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
ts = event.get('timestamp', event.get('timestampMs', 0))
if ts == 0 or ts >= cutoff:
events.append(event)
except json.JSONDecodeError:
continue
if source != '-':
lines.close()
return events
# --- Text Processing ---
def extract_text(event: dict) -> str:
"""Extract text from event."""
if event.get('payload', {}).get('text_preview'):
previews = event['payload']['text_preview']
if previews and isinstance(previews[0], dict):
return previews[0].get('text', '')
if event.get('payload', {}).get('data', {}).get('text'):
return event['payload']['data']['text']
if event.get('payload', {}).get('text'):
return event['payload']['text']
return ''
def clean_text(text: str) -> str:
"""Clean text for analysis."""
text = re.sub(r'\[.*?\]', '', text) # Remove [timestamps], [message_id]
text = re.sub(r'\|', ' ', text) # Tables
text = re.sub(r'[*_`#]', '', text) # Markdown
text = re.sub(r'https?://\S+', '', text) # URLs
text = re.sub(r'\n+', ' ', text) # Newlines
text = re.sub(r'\s+', ' ', text) # Multiple spaces
return text.strip()[:250]
# --- Analysis ---
def analyze_message(text: str, existing_claims: list[dict]) -> Optional[dict]:
"""Use LLM to analyze a message for commitments and claims."""
clean = clean_text(text)
if len(clean) < 20:
return None
if 'HEARTBEAT' in clean or 'cron:' in clean:
return None
prompt = f'''Analyze this message for:
1. Commitments/promises (someone agreeing to do something)
2. Factual claims (verifiable statements)
Message: "{clean}"
Respond with JSON only:
{{
"commitment": {{"found": false}} or {{"found": true, "what": "description", "who": "user/assistant", "deadline": "if mentioned"}},
"claims": [] or [{{"claim": "statement", "topic": "category", "entities": ["names"]}}]
}}'''
response = query_llm(prompt) or query_ollama_cli(prompt)
# Extract JSON from response
json_match = re.search(r'\{[\s\S]*\}', response)
if not json_match:
return None
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
return None
def check_contradiction(new_claim: dict, existing_claims: list[dict]) -> Optional[dict]:
"""Check if a new claim contradicts existing claims."""
# Find related claims by entities
related = []
new_entities = [e.lower() for e in new_claim.get('entities', [])]
for c in existing_claims:
old_entities = [e.lower() for e in c.get('entities', [])]
if any(ne in oe or oe in ne for ne in new_entities for oe in old_entities):
related.append(c)
if not related:
return None
for old in related[:2]: # Check only first 2
prompt = f'''Do these contradict?
Old ({old.get("date", "unknown")}): "{old.get("claim", "")}"
New: "{new_claim.get("claim", "")}"
Respond: {{"contradicts": true/false, "why": "explanation", "severity": "minor/major"}}'''
response = query_llm(prompt) or query_ollama_cli(prompt)
json_match = re.search(r'\{[\s\S]*?\}', response)
if json_match:
try:
check = json.loads(json_match.group(0))
if check.get('contradicts'):
return {**check, 'original': old}
except json.JSONDecodeError:
continue
return None
# --- Commands ---
def cmd_scan(hours: int = 24, jsonl: Optional[str] = None) -> None:
"""Scan recent messages for commitments and claims."""
db = load_db()
cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)
print(f"🧠 Brain Tracker — Scanning last {hours}h\n", file=sys.stderr)
# Fetch events
if jsonl:
all_events = load_jsonl_events(jsonl, hours)
else:
print("📥 Fetching messages...", file=sys.stderr)
in_events = fetch_nats_events('openclaw.events.main.conversation_message_in', 200)
out_events = fetch_nats_events('openclaw.events.main.conversation_message_out', 200)
all_events = in_events + out_events
# Filter and sort
events = [
e for e in all_events
if e.get('timestamp', 0) >= cutoff and e.get('id') not in db['processedIds']
]
events.sort(key=lambda x: x.get('timestamp', 0))
print(f" Found {len(events)} new events to analyze\n", file=sys.stderr)
commitments = 0
claims = 0
contradictions = 0
for i, event in enumerate(events):
text = extract_text(event)
clean = clean_text(text)
if len(clean) < 20:
db['processedIds'].add(event.get('id', ''))
continue
print(f"\r[{i+1}/{len(events)}] Analyzing...", end='', file=sys.stderr)
analysis = analyze_message(text, db['claims'])
if not analysis:
db['processedIds'].add(event.get('id', ''))
continue
date = datetime.fromtimestamp(event.get('timestamp', 0) / 1000).strftime('%Y-%m-%d')
# Process commitment
if analysis.get('commitment', {}).get('found'):
c = analysis['commitment']
import random
import string
commit_id = f"commit-{int(datetime.now().timestamp())}-{''.join(random.choices(string.ascii_lowercase, k=4))}"
db['commitments'].append({
'id': commit_id,
'what': c.get('what', ''),
'who': c.get('who', ''),
'deadline': c.get('deadline'),
'source': clean[:100],
'date': date,
'status': 'open',
})
commitments += 1
print(f"\n✅ Commitment: \"{c.get('what', '')}\"", file=sys.stderr)
# Process claims
for claim in analysis.get('claims', []):
if not claim.get('claim'):
continue
import random
import string
claim_id = f"claim-{int(datetime.now().timestamp())}-{''.join(random.choices(string.ascii_lowercase, k=4))}"
new_claim = {
'id': claim_id,
'claim': claim['claim'],
'topic': claim.get('topic', 'general'),
'entities': claim.get('entities', []),
'source': clean[:100],
'date': date,
}
# Check for contradictions
contra = check_contradiction(new_claim, db['claims'])
if contra:
db['contradictions'].append({
'id': f"contra-{int(datetime.now().timestamp())}",
'new_claim': new_claim,
'old_claim': contra['original'],
'why': contra.get('why', ''),
'severity': contra.get('severity', 'unknown'),
'date': date,
'status': 'unresolved',
})
contradictions += 1
print(f"\n⚠️ CONTRADICTION: {contra.get('why', '')}", file=sys.stderr)
db['claims'].append(new_claim)
claims += 1
db['processedIds'].add(event.get('id', ''))
# Save periodically
if i % 10 == 0:
save_db(db)
db['lastRun'] = datetime.now().isoformat()
save_db(db)
print(f"\n\n📊 Results:", file=sys.stderr)
print(f" New Commitments: {commitments}", file=sys.stderr)
print(f" New Claims: {claims}", file=sys.stderr)
print(f" Contradictions: {contradictions}", file=sys.stderr)
print(f" Total in DB: {len(db['commitments'])} commitments, {len(db['claims'])} claims", file=sys.stderr)
def cmd_list() -> None:
"""List open commitments and unresolved contradictions."""
db = load_db()
open_commits = [c for c in db.get('commitments', []) if c.get('status') == 'open']
unresolved = [c for c in db.get('contradictions', []) if c.get('status') == 'unresolved']
print("🧠 Brain Tracker Status\n")
if open_commits:
print(f"📋 Open Commitments ({len(open_commits)}):\n")
for c in open_commits:
print(f" [{c['id'][-6:]}] {c.get('what', '')}")
deadline = f" | ⏰ {c['deadline']}" if c.get('deadline') else ''
print(f" 👤 {c.get('who', 'unknown')} | 📅 {c.get('date', '')}{deadline}\n")
else:
print("✅ No open commitments\n")
if unresolved:
print(f"⚠️ Contradictions ({len(unresolved)}):\n")
for c in unresolved:
severity = c.get('severity', 'UNKNOWN').upper()
print(f" [{severity}] {c.get('why', '')}")
print(f" Old: \"{c.get('old_claim', {}).get('claim', '')}\"")
print(f" New: \"{c.get('new_claim', {}).get('claim', '')}\"\n")
else:
print("✅ No contradictions\n")
print(f"📊 Total: {len(db.get('claims', []))} claims, {len(db.get('commitments', []))} commitments")
def cmd_done(id_fragment: str) -> None:
"""Mark a commitment as done."""
db = load_db()
commit = next((c for c in db['commitments'] if id_fragment in c['id']), None)
if commit:
commit['status'] = 'done'
commit['completed'] = datetime.now().isoformat()
save_db(db)
print(f"✅ Marked done: {commit.get('what', '')}")
else:
print(f"❌ Not found: {id_fragment}")
def cmd_check(statement: str) -> None:
"""Check a statement against existing claims."""
db = load_db()
q = statement.lower()
matches = [
c for c in db.get('claims', [])
if q in c.get('claim', '').lower() or
any(q in e.lower() for e in c.get('entities', []))
]
if not matches:
print(f"No claims matching: {statement}")
return
print(f"🔍 Claims matching \"{statement}\" ({len(matches)}):\n")
for c in matches[-15:]:
print(f"[{c.get('date', '')}] {c.get('claim', '')}")
if c.get('entities'):
print(f" Entities: {', '.join(c['entities'])}")
print()
# --- Main ---
def main():
parser = argparse.ArgumentParser(
description='Commitment and Contradiction Detection',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Commands:
scan Analyze recent messages for commitments and claims
list Show open commitments and contradictions
done Mark a commitment as completed
check Search claims for a statement
Examples:
cortex track scan --since 24h
cortex track list
cortex track done commit-abc123
cortex track check "TypeDB"
'''
)
sub = parser.add_subparsers(dest='command')
# scan
scan_p = sub.add_parser('scan', help='Scan messages for commitments/claims')
scan_p.add_argument('--since', default='24h',
help='Time window (e.g. 24h, 7d)')
scan_p.add_argument('--jsonl', metavar='FILE',
help='Load events from JSONL file')
# list
sub.add_parser('list', help='Show open items')
# done
done_p = sub.add_parser('done', help='Mark commitment as done')
done_p.add_argument('id', help='Commitment ID (or fragment)')
# check
check_p = sub.add_parser('check', help='Search claims')
check_p.add_argument('statement', nargs='+', help='Statement to search')
args = parser.parse_args()
if args.command == 'scan':
# Parse duration
match = re.match(r'^(\d+)([hdm])$', args.since.lower())
if not match:
print(f"❌ Invalid duration: {args.since}", file=sys.stderr)
sys.exit(1)
value = int(match.group(1))
unit = match.group(2)
hours = value if unit == 'h' else (value * 24 if unit == 'd' else value / 60)
cmd_scan(int(hours), args.jsonl)
elif args.command == 'list':
cmd_list()
elif args.command == 'done':
cmd_done(args.id)
elif args.command == 'check':
cmd_check(' '.join(args.statement))
else:
parser.print_help()
if __name__ == '__main__':
main()