617 lines
19 KiB
Python
617 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""Cortex Tracker — Commitment and Contradiction Detection.
|
|
|
|
Ported from ~/clawd/scripts/brain-tracker.mjs
|
|
|
|
Uses an LLM (configurable via CORTEX_LLM_URL) to analyze messages for:
|
|
- Commitments/promises (someone agreeing to do something)
|
|
- Factual claims (verifiable statements)
|
|
- Contradictions between old and new claims
|
|
|
|
Usage:
|
|
cortex track scan --since 24h
|
|
cortex track list
|
|
cortex track done <id>
|
|
cortex track check "statement"
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
from cortex.config import cortex_home, memory_dir
|
|
|
|
|
|
# --- Configuration ---
|
|
|
|
def _env(key: str, default: str = '') -> str:
|
|
return os.environ.get(key, default)
|
|
|
|
|
|
def llm_url() -> str:
|
|
"""LLM API URL (OpenAI-compatible)."""
|
|
return _env('CORTEX_LLM_URL', 'http://localhost:11434/api/generate')
|
|
|
|
|
|
def llm_model() -> str:
|
|
"""LLM model to use."""
|
|
return _env('CORTEX_LLM_MODEL', 'mistral:7b')
|
|
|
|
|
|
def tracker_db_path() -> Path:
|
|
"""Path to brain-tracker.json database."""
|
|
return memory_dir() / 'brain-tracker.json'
|
|
|
|
|
|
def get_nats_bin() -> str:
|
|
"""Find nats binary."""
|
|
for path in [
|
|
os.path.expanduser('~/bin/nats'),
|
|
'/usr/local/bin/nats',
|
|
'/usr/bin/nats',
|
|
]:
|
|
if os.path.exists(path):
|
|
return path
|
|
return 'nats'
|
|
|
|
|
|
# --- Database ---
|
|
|
|
def load_db() -> dict:
|
|
"""Load tracker database."""
|
|
path = tracker_db_path()
|
|
if path.exists():
|
|
data = json.loads(path.read_text())
|
|
# Convert processedIds list back to set
|
|
data['processedIds'] = set(data.get('processedIds', []))
|
|
return data
|
|
return {
|
|
'commitments': [],
|
|
'claims': [],
|
|
'contradictions': [],
|
|
'processedIds': set(),
|
|
'lastRun': None,
|
|
}
|
|
|
|
|
|
def save_db(db: dict) -> None:
|
|
"""Save tracker database."""
|
|
path = tracker_db_path()
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Convert set to list for JSON serialization
|
|
to_save = {
|
|
**db,
|
|
'processedIds': list(db['processedIds']),
|
|
}
|
|
path.write_text(json.dumps(to_save, indent=2, ensure_ascii=False))
|
|
|
|
|
|
# --- LLM Integration ---
|
|
|
|
def query_llm(prompt: str, timeout: int = 25) -> str:
|
|
"""Query the LLM with a prompt."""
|
|
url = llm_url()
|
|
model = llm_model()
|
|
|
|
# Detect API type from URL
|
|
if '/api/generate' in url:
|
|
# Ollama API
|
|
try:
|
|
response = requests.post(
|
|
url,
|
|
json={
|
|
'model': model,
|
|
'prompt': prompt,
|
|
'stream': False,
|
|
'options': {
|
|
'temperature': 0.3,
|
|
'num_predict': 500,
|
|
}
|
|
},
|
|
timeout=timeout
|
|
)
|
|
if response.status_code == 200:
|
|
return response.json().get('response', '')
|
|
except Exception as e:
|
|
print(f"⚠️ LLM error: {e}", file=sys.stderr)
|
|
return ''
|
|
else:
|
|
# OpenAI-compatible API
|
|
try:
|
|
response = requests.post(
|
|
url,
|
|
headers={
|
|
'Authorization': f"Bearer {_env('CORTEX_LLM_API_KEY', '')}",
|
|
'Content-Type': 'application/json',
|
|
},
|
|
json={
|
|
'model': model,
|
|
'messages': [{'role': 'user', 'content': prompt}],
|
|
'temperature': 0.3,
|
|
'max_tokens': 500,
|
|
},
|
|
timeout=timeout
|
|
)
|
|
if response.status_code == 200:
|
|
return response.json()['choices'][0]['message']['content']
|
|
except Exception as e:
|
|
print(f"⚠️ LLM error: {e}", file=sys.stderr)
|
|
return ''
|
|
|
|
return ''
|
|
|
|
|
|
def query_ollama_cli(prompt: str, timeout: int = 25) -> str:
|
|
"""Query Ollama via CLI as fallback."""
|
|
try:
|
|
result = subprocess.run(
|
|
['ollama', 'run', llm_model()],
|
|
input=prompt,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
return result.stdout or ''
|
|
except Exception:
|
|
return ''
|
|
|
|
|
|
# --- Event Fetching ---
|
|
|
|
def fetch_nats_events(subject: str, count: int = 100) -> list[dict]:
|
|
"""Fetch events from NATS for a given subject."""
|
|
nats_bin = get_nats_bin()
|
|
events = []
|
|
|
|
# Get last message to find sequence
|
|
try:
|
|
result = subprocess.run(
|
|
[nats_bin, 'stream', 'get', 'openclaw-events', f'--last-for={subject}'],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
|
|
match = re.search(r'openclaw-events#(\d+)', result.stdout or '')
|
|
if not match:
|
|
return []
|
|
|
|
last_seq = int(match.group(1))
|
|
start_seq = max(1, last_seq - count)
|
|
|
|
# Create ephemeral consumer
|
|
consumer_name = f'brain-{int(datetime.now().timestamp())}'
|
|
|
|
subprocess.run(
|
|
[nats_bin, 'consumer', 'rm', 'openclaw-events', consumer_name, '--force'],
|
|
capture_output=True, timeout=3
|
|
)
|
|
|
|
subprocess.run(
|
|
[nats_bin, 'consumer', 'add', 'openclaw-events', consumer_name,
|
|
'--pull', f'--deliver={start_seq}', '--ack=none',
|
|
f'--filter={subject}', '--defaults'],
|
|
capture_output=True, timeout=5
|
|
)
|
|
|
|
result = subprocess.run(
|
|
[nats_bin, 'consumer', 'next', 'openclaw-events', consumer_name,
|
|
'--count', str(count), '--raw'],
|
|
capture_output=True, text=True, timeout=15
|
|
)
|
|
|
|
subprocess.run(
|
|
[nats_bin, 'consumer', 'rm', 'openclaw-events', consumer_name, '--force'],
|
|
capture_output=True, timeout=3
|
|
)
|
|
|
|
for line in (result.stdout or '').split('\n'):
|
|
line = line.strip()
|
|
if not line or not line.startswith('{'):
|
|
continue
|
|
try:
|
|
events.append(json.loads(line))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ NATS error: {e}", file=sys.stderr)
|
|
|
|
return events
|
|
|
|
|
|
def load_jsonl_events(source: str, hours: int = 24) -> list[dict]:
|
|
"""Load events from JSONL file."""
|
|
events = []
|
|
cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)
|
|
|
|
if source == '-':
|
|
lines = sys.stdin
|
|
else:
|
|
path = Path(source)
|
|
if not path.exists():
|
|
return []
|
|
lines = path.open()
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
event = json.loads(line)
|
|
ts = event.get('timestamp', event.get('timestampMs', 0))
|
|
if ts == 0 or ts >= cutoff:
|
|
events.append(event)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if source != '-':
|
|
lines.close()
|
|
|
|
return events
|
|
|
|
|
|
# --- Text Processing ---
|
|
|
|
def extract_text(event: dict) -> str:
|
|
"""Extract text from event."""
|
|
if event.get('payload', {}).get('text_preview'):
|
|
previews = event['payload']['text_preview']
|
|
if previews and isinstance(previews[0], dict):
|
|
return previews[0].get('text', '')
|
|
|
|
if event.get('payload', {}).get('data', {}).get('text'):
|
|
return event['payload']['data']['text']
|
|
|
|
if event.get('payload', {}).get('text'):
|
|
return event['payload']['text']
|
|
|
|
return ''
|
|
|
|
|
|
def clean_text(text: str) -> str:
|
|
"""Clean text for analysis."""
|
|
text = re.sub(r'\[.*?\]', '', text) # Remove [timestamps], [message_id]
|
|
text = re.sub(r'\|', ' ', text) # Tables
|
|
text = re.sub(r'[*_`#]', '', text) # Markdown
|
|
text = re.sub(r'https?://\S+', '', text) # URLs
|
|
text = re.sub(r'\n+', ' ', text) # Newlines
|
|
text = re.sub(r'\s+', ' ', text) # Multiple spaces
|
|
return text.strip()[:250]
|
|
|
|
|
|
# --- Analysis ---
|
|
|
|
def analyze_message(text: str, existing_claims: list[dict]) -> Optional[dict]:
|
|
"""Use LLM to analyze a message for commitments and claims."""
|
|
clean = clean_text(text)
|
|
if len(clean) < 20:
|
|
return None
|
|
if 'HEARTBEAT' in clean or 'cron:' in clean:
|
|
return None
|
|
|
|
prompt = f'''Analyze this message for:
|
|
1. Commitments/promises (someone agreeing to do something)
|
|
2. Factual claims (verifiable statements)
|
|
|
|
Message: "{clean}"
|
|
|
|
Respond with JSON only:
|
|
{{
|
|
"commitment": {{"found": false}} or {{"found": true, "what": "description", "who": "user/assistant", "deadline": "if mentioned"}},
|
|
"claims": [] or [{{"claim": "statement", "topic": "category", "entities": ["names"]}}]
|
|
}}'''
|
|
|
|
response = query_llm(prompt) or query_ollama_cli(prompt)
|
|
|
|
# Extract JSON from response
|
|
json_match = re.search(r'\{[\s\S]*\}', response)
|
|
if not json_match:
|
|
return None
|
|
|
|
try:
|
|
return json.loads(json_match.group(0))
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def check_contradiction(new_claim: dict, existing_claims: list[dict]) -> Optional[dict]:
|
|
"""Check if a new claim contradicts existing claims."""
|
|
# Find related claims by entities
|
|
related = []
|
|
new_entities = [e.lower() for e in new_claim.get('entities', [])]
|
|
|
|
for c in existing_claims:
|
|
old_entities = [e.lower() for e in c.get('entities', [])]
|
|
if any(ne in oe or oe in ne for ne in new_entities for oe in old_entities):
|
|
related.append(c)
|
|
|
|
if not related:
|
|
return None
|
|
|
|
for old in related[:2]: # Check only first 2
|
|
prompt = f'''Do these contradict?
|
|
Old ({old.get("date", "unknown")}): "{old.get("claim", "")}"
|
|
New: "{new_claim.get("claim", "")}"
|
|
Respond: {{"contradicts": true/false, "why": "explanation", "severity": "minor/major"}}'''
|
|
|
|
response = query_llm(prompt) or query_ollama_cli(prompt)
|
|
json_match = re.search(r'\{[\s\S]*?\}', response)
|
|
|
|
if json_match:
|
|
try:
|
|
check = json.loads(json_match.group(0))
|
|
if check.get('contradicts'):
|
|
return {**check, 'original': old}
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return None
|
|
|
|
|
|
# --- Commands ---
|
|
|
|
def cmd_scan(hours: int = 24, jsonl: Optional[str] = None) -> None:
|
|
"""Scan recent messages for commitments and claims."""
|
|
db = load_db()
|
|
cutoff = int((datetime.now() - timedelta(hours=hours)).timestamp() * 1000)
|
|
|
|
print(f"🧠 Brain Tracker — Scanning last {hours}h\n", file=sys.stderr)
|
|
|
|
# Fetch events
|
|
if jsonl:
|
|
all_events = load_jsonl_events(jsonl, hours)
|
|
else:
|
|
print("📥 Fetching messages...", file=sys.stderr)
|
|
in_events = fetch_nats_events('openclaw.events.main.conversation_message_in', 200)
|
|
out_events = fetch_nats_events('openclaw.events.main.conversation_message_out', 200)
|
|
all_events = in_events + out_events
|
|
|
|
# Filter and sort
|
|
events = [
|
|
e for e in all_events
|
|
if e.get('timestamp', 0) >= cutoff and e.get('id') not in db['processedIds']
|
|
]
|
|
events.sort(key=lambda x: x.get('timestamp', 0))
|
|
|
|
print(f" Found {len(events)} new events to analyze\n", file=sys.stderr)
|
|
|
|
commitments = 0
|
|
claims = 0
|
|
contradictions = 0
|
|
|
|
for i, event in enumerate(events):
|
|
text = extract_text(event)
|
|
clean = clean_text(text)
|
|
|
|
if len(clean) < 20:
|
|
db['processedIds'].add(event.get('id', ''))
|
|
continue
|
|
|
|
print(f"\r[{i+1}/{len(events)}] Analyzing...", end='', file=sys.stderr)
|
|
|
|
analysis = analyze_message(text, db['claims'])
|
|
|
|
if not analysis:
|
|
db['processedIds'].add(event.get('id', ''))
|
|
continue
|
|
|
|
date = datetime.fromtimestamp(event.get('timestamp', 0) / 1000).strftime('%Y-%m-%d')
|
|
|
|
# Process commitment
|
|
if analysis.get('commitment', {}).get('found'):
|
|
c = analysis['commitment']
|
|
import random
|
|
import string
|
|
commit_id = f"commit-{int(datetime.now().timestamp())}-{''.join(random.choices(string.ascii_lowercase, k=4))}"
|
|
|
|
db['commitments'].append({
|
|
'id': commit_id,
|
|
'what': c.get('what', ''),
|
|
'who': c.get('who', ''),
|
|
'deadline': c.get('deadline'),
|
|
'source': clean[:100],
|
|
'date': date,
|
|
'status': 'open',
|
|
})
|
|
commitments += 1
|
|
print(f"\n✅ Commitment: \"{c.get('what', '')}\"", file=sys.stderr)
|
|
|
|
# Process claims
|
|
for claim in analysis.get('claims', []):
|
|
if not claim.get('claim'):
|
|
continue
|
|
|
|
import random
|
|
import string
|
|
claim_id = f"claim-{int(datetime.now().timestamp())}-{''.join(random.choices(string.ascii_lowercase, k=4))}"
|
|
|
|
new_claim = {
|
|
'id': claim_id,
|
|
'claim': claim['claim'],
|
|
'topic': claim.get('topic', 'general'),
|
|
'entities': claim.get('entities', []),
|
|
'source': clean[:100],
|
|
'date': date,
|
|
}
|
|
|
|
# Check for contradictions
|
|
contra = check_contradiction(new_claim, db['claims'])
|
|
if contra:
|
|
db['contradictions'].append({
|
|
'id': f"contra-{int(datetime.now().timestamp())}",
|
|
'new_claim': new_claim,
|
|
'old_claim': contra['original'],
|
|
'why': contra.get('why', ''),
|
|
'severity': contra.get('severity', 'unknown'),
|
|
'date': date,
|
|
'status': 'unresolved',
|
|
})
|
|
contradictions += 1
|
|
print(f"\n⚠️ CONTRADICTION: {contra.get('why', '')}", file=sys.stderr)
|
|
|
|
db['claims'].append(new_claim)
|
|
claims += 1
|
|
|
|
db['processedIds'].add(event.get('id', ''))
|
|
|
|
# Save periodically
|
|
if i % 10 == 0:
|
|
save_db(db)
|
|
|
|
db['lastRun'] = datetime.now().isoformat()
|
|
save_db(db)
|
|
|
|
print(f"\n\n📊 Results:", file=sys.stderr)
|
|
print(f" New Commitments: {commitments}", file=sys.stderr)
|
|
print(f" New Claims: {claims}", file=sys.stderr)
|
|
print(f" Contradictions: {contradictions}", file=sys.stderr)
|
|
print(f" Total in DB: {len(db['commitments'])} commitments, {len(db['claims'])} claims", file=sys.stderr)
|
|
|
|
|
|
def cmd_list() -> None:
|
|
"""List open commitments and unresolved contradictions."""
|
|
db = load_db()
|
|
|
|
open_commits = [c for c in db.get('commitments', []) if c.get('status') == 'open']
|
|
unresolved = [c for c in db.get('contradictions', []) if c.get('status') == 'unresolved']
|
|
|
|
print("🧠 Brain Tracker Status\n")
|
|
|
|
if open_commits:
|
|
print(f"📋 Open Commitments ({len(open_commits)}):\n")
|
|
for c in open_commits:
|
|
print(f" [{c['id'][-6:]}] {c.get('what', '')}")
|
|
deadline = f" | ⏰ {c['deadline']}" if c.get('deadline') else ''
|
|
print(f" 👤 {c.get('who', 'unknown')} | 📅 {c.get('date', '')}{deadline}\n")
|
|
else:
|
|
print("✅ No open commitments\n")
|
|
|
|
if unresolved:
|
|
print(f"⚠️ Contradictions ({len(unresolved)}):\n")
|
|
for c in unresolved:
|
|
severity = c.get('severity', 'UNKNOWN').upper()
|
|
print(f" [{severity}] {c.get('why', '')}")
|
|
print(f" Old: \"{c.get('old_claim', {}).get('claim', '')}\"")
|
|
print(f" New: \"{c.get('new_claim', {}).get('claim', '')}\"\n")
|
|
else:
|
|
print("✅ No contradictions\n")
|
|
|
|
print(f"📊 Total: {len(db.get('claims', []))} claims, {len(db.get('commitments', []))} commitments")
|
|
|
|
|
|
def cmd_done(id_fragment: str) -> None:
|
|
"""Mark a commitment as done."""
|
|
db = load_db()
|
|
|
|
commit = next((c for c in db['commitments'] if id_fragment in c['id']), None)
|
|
if commit:
|
|
commit['status'] = 'done'
|
|
commit['completed'] = datetime.now().isoformat()
|
|
save_db(db)
|
|
print(f"✅ Marked done: {commit.get('what', '')}")
|
|
else:
|
|
print(f"❌ Not found: {id_fragment}")
|
|
|
|
|
|
def cmd_check(statement: str) -> None:
|
|
"""Check a statement against existing claims."""
|
|
db = load_db()
|
|
q = statement.lower()
|
|
|
|
matches = [
|
|
c for c in db.get('claims', [])
|
|
if q in c.get('claim', '').lower() or
|
|
any(q in e.lower() for e in c.get('entities', []))
|
|
]
|
|
|
|
if not matches:
|
|
print(f"No claims matching: {statement}")
|
|
return
|
|
|
|
print(f"🔍 Claims matching \"{statement}\" ({len(matches)}):\n")
|
|
for c in matches[-15:]:
|
|
print(f"[{c.get('date', '')}] {c.get('claim', '')}")
|
|
if c.get('entities'):
|
|
print(f" Entities: {', '.join(c['entities'])}")
|
|
print()
|
|
|
|
|
|
# --- Main ---
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Commitment and Contradiction Detection',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog='''
|
|
Commands:
|
|
scan Analyze recent messages for commitments and claims
|
|
list Show open commitments and contradictions
|
|
done Mark a commitment as completed
|
|
check Search claims for a statement
|
|
|
|
Examples:
|
|
cortex track scan --since 24h
|
|
cortex track list
|
|
cortex track done commit-abc123
|
|
cortex track check "TypeDB"
|
|
'''
|
|
)
|
|
|
|
sub = parser.add_subparsers(dest='command')
|
|
|
|
# scan
|
|
scan_p = sub.add_parser('scan', help='Scan messages for commitments/claims')
|
|
scan_p.add_argument('--since', default='24h',
|
|
help='Time window (e.g. 24h, 7d)')
|
|
scan_p.add_argument('--jsonl', metavar='FILE',
|
|
help='Load events from JSONL file')
|
|
|
|
# list
|
|
sub.add_parser('list', help='Show open items')
|
|
|
|
# done
|
|
done_p = sub.add_parser('done', help='Mark commitment as done')
|
|
done_p.add_argument('id', help='Commitment ID (or fragment)')
|
|
|
|
# check
|
|
check_p = sub.add_parser('check', help='Search claims')
|
|
check_p.add_argument('statement', nargs='+', help='Statement to search')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == 'scan':
|
|
# Parse duration
|
|
match = re.match(r'^(\d+)([hdm])$', args.since.lower())
|
|
if not match:
|
|
print(f"❌ Invalid duration: {args.since}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
value = int(match.group(1))
|
|
unit = match.group(2)
|
|
hours = value if unit == 'h' else (value * 24 if unit == 'd' else value / 60)
|
|
|
|
cmd_scan(int(hours), args.jsonl)
|
|
|
|
elif args.command == 'list':
|
|
cmd_list()
|
|
|
|
elif args.command == 'done':
|
|
cmd_done(args.id)
|
|
|
|
elif args.command == 'check':
|
|
cmd_check(' '.join(args.statement))
|
|
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|