#!/usr/bin/env python3 """Preference & Learning Pair Extractor for Darkplex. Extracts training signal from session transcripts in multiple categories: 1. **DPO Pairs** (correction → preference): prompt + chosen + rejected - Hard corrections: "nein, falsch, das stimmt nicht" - Soft redirects: "lass uns lieber...", "eher so..." 2. **SFT Pairs** (positive reinforcement → good examples): prompt + response - Positive signals: "super", "genau so", "perfekt", 👍 - These are responses worth reinforcing 3. **Teaching Pairs** (knowledge transfer → learning): context + lesson - Albert explains something new - "wir haben mal gesagt...", "das ist weil...", "denk dran..." Usage: python -m cortex.dpo_extractor --since 7d python -m cortex.dpo_extractor --since 30d --output ~/clawd/training-data/dpo-pairs.json python -m cortex.dpo_extractor --dry-run --since 7d """ import argparse import asyncio import json import os import re import sys from datetime import datetime, timedelta from pathlib import Path from typing import Optional # --- Correction Detection --- # Explicit correction: user tells Claudia she's wrong or should do something differently CORRECTION_PATTERNS = [ # Direct negation + instruction (r'(?:^|\s)nein[,.]?\s+(?:du\s+sollst|ich\s+(?:wollte|meinte|meine))', 'direct_correction', 0.9), (r'nicht\s+so[,.]?\s+(?:sondern|ich\s+(?:meinte|wollte))', 'redirect', 0.9), (r'das\s+(?:stimmt|ist)\s+(?:nicht|falsch)', 'factual_correction', 0.85), (r'(?:^|\s)falsch[.!]', 'wrong', 0.85), # Implicit correction: frustration + redirect (r'(?:bist\s+du\s+)?bescheuert\??', 'frustration_correction', 0.95), (r'(?:du\s+hast\s+(?:es\s+)?(?:vergessen|übersehen))', 'forgotten', 0.8), (r'ich\s+(?:hab|habe)\s+(?:doch\s+)?gesagt', 'repeated_instruction', 0.85), (r'(?:das\s+)?(?:war|ist)\s+(?:nicht\s+(?:das|was)\s+ich|falsch|quatsch)', 'rejection', 0.85), (r'(?:nochmal|noch\s*mal)[,:]?\s+(?:ich|du|wir|das)', 'retry_request', 0.7), # Mild corrections (r'(?:naja|nee|hmm)[,.]?\s+(?:eher|lieber|besser|anders)', 'mild_redirect', 0.7), (r'(?:finds?|finde)\s+(?:ich\s+)?(?:blöd|schlecht|nicht\s+gut)', 'negative_feedback', 0.75), (r'du\s+(?:solltest|musst|kannst)\s+(?:eher|lieber|besser)', 'should_redirect', 0.7), ] # Anti-patterns: things that look like corrections but aren't FALSE_POSITIVE_PATTERNS = [ r'^system:', # System messages r'heartbeat', # Heartbeat r'^\[media\s+attached', # Media attachments r'^\[cron:', # Cron messages r'^pre-compaction', # Memory flush r'exec\s+(?:completed|failed)', # Exec results r'^read\s+heartbeat', # Heartbeat instructions r'^a\s+subagent\s+task', # Sub-agent completion r'^creating\s+(?:config|symlink)', # System output r'apt\.conf', # Package manager output ] # Compile patterns CORRECTION_RE = [(re.compile(p, re.IGNORECASE | re.MULTILINE), label, conf) for p, label, conf in CORRECTION_PATTERNS] FALSE_POSITIVE_RE = [re.compile(p, re.IGNORECASE) for p in FALSE_POSITIVE_PATTERNS] # --- Positive Reinforcement Detection --- POSITIVE_PATTERNS = [ # Explicit praise (r'(?:^|\s)(?:super|perfekt|genau\s*so|klasse|toll|prima|excellent|great|nice)[\s!\.]*$', 'praise', 0.85), (r'(?:^|\s)(?:genau|exactly|perfect|richtig)[\s!\.]*$', 'affirmation', 0.8), (r'(?:das|so)\s+(?:ist|war)\s+(?:super|gut|perfekt|genau\s+richtig)', 'quality_praise', 0.85), (r'(?:gefällt|mag)\s+(?:mir|ich)', 'preference_positive', 0.75), (r'(?:gut|super|toll)\s+gemacht', 'task_praise', 0.9), (r'👍|👏|🙌|❤️|🔥|💯|✅', 'emoji_positive', 0.7), # Implicit positive: user builds on the response (r'(?:ja|ok|alles\s*klar)[,.]?\s+(?:und\s+jetzt|dann|mach\s+(?:mal|jetzt|weiter))', 'accept_continue', 0.7), ] POSITIVE_RE = [(re.compile(p, re.IGNORECASE | re.MULTILINE), label, conf) for p, label, conf in POSITIVE_PATTERNS] # --- Teaching/Knowledge Transfer Detection --- TEACHING_PATTERNS = [ # Albert explains why/how (r'(?:wir\s+haben\s+(?:mal\s+)?gesagt|wir\s+machen\s+das\s+(?:so|weil))', 'established_rule', 0.85), (r'(?:das\s+ist\s+weil|der\s+grund\s+(?:ist|dafür))', 'explanation', 0.8), (r'(?:denk\s+dran|vergiss\s+nicht|wichtig(?:\s+ist)?:)', 'reminder_teaching', 0.85), (r'(?:du\s+(?:hast|hattest)\s+(?:das\s+)?(?:schon\s+)?(?:mal\s+)?(?:gemacht|installiert|gebaut))', 'prior_knowledge', 0.8), (r'(?:das\s+(?:problem|thema)\s+ist\s+(?:ja\s+)?(?:dass|weil|-))', 'problem_framing', 0.75), (r'(?:ich\s+(?:will|möchte|hätte\s+gerne)\s+(?:dass|lieber|eher))', 'preference_statement', 0.8), (r'(?:die\s+(?:regel|strategie|idee)\s+ist)', 'strategy_teaching', 0.85), # Albert shares context Claudia should remember (r'(?:(?:zur\s+)?info:|fyi:?|heads\s*up:?)', 'info_sharing', 0.7), ] TEACHING_RE = [(re.compile(p, re.IGNORECASE | re.MULTILINE), label, conf) for p, label, conf in TEACHING_PATTERNS] # --- Soft Redirect Detection (milder than corrections) --- SOFT_REDIRECT_PATTERNS = [ (r'(?:lass\s+uns\s+(?:lieber|eher|besser|mal))', 'lets_rather', 0.75), (r'(?:ich\s+würde?\s+(?:eher|lieber|besser))', 'i_would_rather', 0.7), (r'(?:können?\s+wir\s+(?:nicht\s+)?(?:lieber|eher|stattdessen))', 'can_we_instead', 0.7), (r'(?:anderer?\s+(?:plan|idee|ansatz|weg|vorschlag))', 'alternative_plan', 0.75), (r'(?:wechsel(?:n)?\s+(?:wir|mal)\s+(?:zu|auf|den))', 'switch_to', 0.7), ] SOFT_REDIRECT_RE = [(re.compile(p, re.IGNORECASE | re.MULTILINE), label, conf) for p, label, conf in SOFT_REDIRECT_PATTERNS] # Minimum lengths MIN_PROMPT_LEN = 15 # User prompt must be meaningful MIN_RESPONSE_LEN = 80 # Assistant response must be substantive MIN_CORRECTION_LEN = 20 # Correction must explain what's wrong MIN_TEACHING_LEN = 30 # Teaching must have substance def detect_positive(text: str) -> Optional[tuple[str, float]]: """Detect if user message is positive reinforcement.""" clean = clean_user_text(text) if is_false_positive(clean): return None if len(clean) > 200: # Long messages are rarely just praise return None best_match = None best_conf = 0.0 for pattern, label, conf in POSITIVE_RE: if pattern.search(clean): if conf > best_conf: best_match = (label, conf) best_conf = conf return best_match def detect_teaching(text: str) -> Optional[tuple[str, float]]: """Detect if user message is a teaching/knowledge transfer moment.""" clean = clean_user_text(text) if is_false_positive(clean): return None if len(clean) < MIN_TEACHING_LEN: return None best_match = None best_conf = 0.0 for pattern, label, conf in TEACHING_RE: if pattern.search(clean): if conf > best_conf: best_match = (label, conf) best_conf = conf return best_match def detect_soft_redirect(text: str) -> Optional[tuple[str, float]]: """Detect if user message is a soft redirect (mild preference signal).""" clean = clean_user_text(text) if is_false_positive(clean): return None if len(clean) < MIN_CORRECTION_LEN: return None best_match = None best_conf = 0.0 for pattern, label, conf in SOFT_REDIRECT_RE: if pattern.search(clean): if conf > best_conf: best_match = (label, conf) best_conf = conf return best_match def is_false_positive(text: str) -> bool: """Check if text matches false positive patterns.""" return any(p.search(text) for p in FALSE_POSITIVE_RE) def detect_correction(text: str) -> Optional[tuple[str, float]]: """Detect if user message is a correction. Returns (label, confidence) or None.""" # Clean first, then check false positives on cleaned text clean = clean_user_text(text) if is_false_positive(clean): return None if len(clean) < MIN_CORRECTION_LEN: return None best_match = None best_conf = 0.0 for pattern, label, conf in CORRECTION_RE: if pattern.search(clean): if conf > best_conf: best_match = (label, conf) best_conf = conf return best_match # --- Event Parsing --- def extract_user_text(event: dict) -> Optional[str]: """Extract user text from a conversation.message.in event.""" if event.get('type') != 'conversation.message.in': return None payload = event.get('payload', {}) # text_preview format (most common) if isinstance(payload.get('text_preview'), list) and payload['text_preview']: return payload['text_preview'][0].get('text', '') # Direct content if 'content' in payload: return payload['content'] return None def extract_assistant_text(event: dict) -> Optional[str]: """Extract assistant text from a conversation.message.out event.""" if event.get('type') != 'conversation.message.out': return None payload = event.get('payload', {}) if isinstance(payload.get('data'), dict) and 'text' in payload['data']: return payload['data']['text'] if 'content' in payload: return payload['content'] return None def get_session_id(event: dict) -> str: """Extract session identifier from event.""" payload = event.get('payload', {}) if event.get('type') == 'conversation.message.out' and payload.get('runId'): return payload['runId'] if payload.get('sessionId'): return payload['sessionId'] return event.get('session', 'unknown') def clean_user_text(text: str) -> str: """Strip metadata from user message, return the actual content.""" # Remove System: [timestamp] Matrix message from X: prefix text = re.sub(r'^System:\s*\[.*?\]\s*(?:Matrix\s+message\s+from\s+\w+:\s*)?', '', text).strip() # Remove [Day YYYY-MM-DD HH:MM TZ] or [YYYY-MM-DD ...] timestamp prefix text = re.sub(r'^\[(?:\w+\s+)?\d{4}-\d{2}-\d{2}[^\]]*\]\s*', '', text).strip() # Remove [Matrix user ...] prefix text = re.sub(r'^\[Matrix\s+\w+[^\]]*\]\s*', '', text).strip() # Remove message_id lines text = re.sub(r'\[message_id:.*?\]', '', text).strip() return text # --- DPO Pair Construction --- def build_dpo_pair( prompt_event: dict, rejected_event: dict, correction_event: dict, correction_label: str, correction_confidence: float, ) -> Optional[dict]: """Build a DPO training pair from a correction sequence. Returns dict with: prompt, chosen, rejected, metadata """ prompt_text = clean_user_text(extract_user_text(prompt_event) or '') rejected_text = extract_assistant_text(rejected_event) or '' correction_text = clean_user_text(extract_user_text(correction_event) or '') # Validate lengths if len(prompt_text) < MIN_PROMPT_LEN: return None if len(rejected_text) < MIN_RESPONSE_LEN: return None if len(correction_text) < MIN_CORRECTION_LEN: return None # The "chosen" response is constructed from the correction context. # We use the correction as a signal — the chosen text is what the user # wanted instead. For DPO training, we need an actual better response. # Strategy: use the correction itself as context for what "chosen" should be. # In practice, after the correction, the assistant usually gives a better response. # We'll look for that in the caller. return { 'prompt': prompt_text, 'rejected': rejected_text, 'chosen': None, # To be filled by caller with post-correction response 'correction': correction_text, 'metadata': { 'correction_type': correction_label, 'confidence': correction_confidence, 'prompt_seq': prompt_event.get('seq'), 'rejected_seq': rejected_event.get('seq'), 'correction_seq': correction_event.get('seq'), 'session': get_session_id(prompt_event), 'timestamp': correction_event.get('timestamp'), } } # --- NATS Fetching --- # --- Session Transcript Parsing --- def load_session_transcripts(sessions_dir: str, since_hours: int = 168) -> list[dict]: """Load conversation messages from OpenClaw session JSONL files. Returns a flat list of events with 'role', 'text', 'session', 'timestamp', 'seq'. """ from pathlib import Path import time sessions_path = Path(sessions_dir) if not sessions_path.exists(): print(f" ⚠️ Sessions dir not found: {sessions_dir}", file=sys.stderr) return [] cutoff_time = time.time() - (since_hours * 3600) events = [] files_processed = 0 for jsonl_file in sorted(sessions_path.glob('*.jsonl')): # Skip old files if jsonl_file.stat().st_mtime < cutoff_time: continue session_id = jsonl_file.stem seq = 0 try: with open(jsonl_file) as fh: for line in fh: try: entry = json.loads(line) except json.JSONDecodeError: continue if entry.get('type') != 'message': continue msg = entry.get('message', {}) role = msg.get('role', '') if role not in ('user', 'assistant'): continue # Extract text content content = msg.get('content', '') if isinstance(content, list): text_parts = [] for part in content: if isinstance(part, dict) and part.get('type') == 'text': text_parts.append(part.get('text', '')) content = '\n'.join(text_parts) if not content: continue events.append({ 'type': f'conversation.message.{"in" if role == "user" else "out"}', 'role': role, 'text': content, 'session': session_id, 'timestamp': entry.get('timestamp', 0), 'seq': seq, 'payload': { 'text_preview': [{'type': 'text', 'text': content}] if role == 'user' else {}, 'data': {'text': content} if role == 'assistant' else {}, 'sessionId': session_id, }, }) seq += 1 files_processed += 1 except Exception as e: print(f" ⚠️ Error reading {jsonl_file.name}: {e}", file=sys.stderr) print(f" Loaded {len(events)} messages from {files_processed} session files", file=sys.stderr) return events def fetch_events_by_sequence(start_seq: int, end_seq: int, batch_size: int = 500) -> list[dict]: """Fetch events from NATS by sequence range using nats-py (fast) or CLI (fallback).""" # Try fast async fetch first try: import asyncio events = asyncio.run(_fetch_events_async(start_seq, end_seq)) if events: return events except Exception as e: print(f" Async fetch failed ({e}), falling back to CLI", file=sys.stderr) return _fetch_events_cli(start_seq, end_seq) async def _fetch_events_async(start_seq: int, end_seq: int) -> list[dict]: """Fast bulk fetch using nats-py consumer.""" import nats as nats_lib from nats.js.api import ConsumerConfig, DeliverPolicy user = os.environ.get('NATS_USER', 'claudia') password = os.environ.get('NATS_PASSWORD', '') nc = await nats_lib.connect( 'nats://localhost:4222', user=user, password=password, ) js = nc.jetstream() events = [] # Create ephemeral ordered consumer starting at our sequence sub = await js.subscribe( 'openclaw.events.>', ordered_consumer=True, config=ConsumerConfig( deliver_policy=DeliverPolicy.BY_START_SEQUENCE, opt_start_seq=start_seq, ), ) try: count = 0 while True: try: msg = await asyncio.wait_for(sub.next_msg(), timeout=2.0) try: event = json.loads(msg.data.decode()) except (json.JSONDecodeError, UnicodeDecodeError): count += 1 continue event['seq'] = count + start_seq events.append(event) count += 1 if count % 1000 == 0: print(f" Fetched {count} events...", file=sys.stderr, end='\r') # Stop at end_seq (approximate via count) if count >= (end_seq - start_seq + 1): break except asyncio.TimeoutError: break finally: await sub.unsubscribe() await nc.drain() print(f" Fetched {len(events)} events (async) ", file=sys.stderr) return events def _fetch_events_cli(start_seq: int, end_seq: int) -> list[dict]: """Fallback: fetch events one by one via nats CLI.""" import subprocess events = [] errors = 0 for seq in range(start_seq, end_seq + 1): try: result = subprocess.run( ['nats', 'stream', 'get', 'openclaw-events', str(seq)], capture_output=True, text=True, timeout=5, ) for line in result.stdout.split('\n'): if line.startswith('{'): event = json.loads(line) event['seq'] = seq events.append(event) break except Exception: errors += 1 if errors > 50: print(f" ⚠️ Too many errors ({errors}), stopping", file=sys.stderr) break if (seq - start_seq) % 200 == 0 and seq > start_seq: print(f" Fetched {seq - start_seq}/{end_seq - start_seq} events...", file=sys.stderr, end='\r') print(f" Fetched {len(events)} events ({errors} errors) ", file=sys.stderr) return events def get_stream_info() -> dict: """Get NATS stream info.""" import subprocess result = subprocess.run( ['nats', 'stream', 'info', 'openclaw-events', '--json'], capture_output=True, text=True, timeout=10, ) return json.loads(result.stdout) # --- Main Extraction Pipeline --- def _build_conversation(events: list[dict]) -> list[tuple[str, str, dict]]: """Build conversation sequence from events: list of (role, text, event).""" conversation = [] for event in events: user_text = extract_user_text(event) if user_text: conversation.append(('user', user_text, event)) continue asst_text = extract_assistant_text(event) if asst_text: # Keep the longest assistant response in a streak if conversation and conversation[-1][0] == 'assistant': if len(asst_text) > len(conversation[-1][1]): conversation[-1] = ('assistant', asst_text, event) else: conversation.append(('assistant', asst_text, event)) return conversation def extract_all_signals(events: list[dict], verbose: bool = False) -> dict: """Extract ALL learning signals from events. Returns dict with: - dpo_pairs: hard correction DPO pairs (prompt + chosen + rejected) - soft_redirect_pairs: soft redirect DPO pairs - sft_pairs: positively reinforced exchanges (prompt + good response) - teaching_pairs: knowledge transfer moments (context + lesson) - stats: extraction statistics """ # Group by session sessions: dict[str, list[dict]] = {} for event in events: sid = get_session_id(event) sessions.setdefault(sid, []).append(event) dpo_pairs = [] soft_redirect_pairs = [] sft_pairs = [] teaching_pairs = [] stats = { 'sessions': 0, 'corrections_found': 0, 'dpo_pairs_built': 0, 'dpo_with_chosen': 0, 'soft_redirects_found': 0, 'soft_redirect_pairs_built': 0, 'positives_found': 0, 'sft_pairs_built': 0, 'teachings_found': 0, 'teaching_pairs_built': 0, } for sid, session_events in sessions.items(): session_events.sort(key=lambda e: e.get('seq', 0)) stats['sessions'] += 1 conversation = _build_conversation(session_events) for i in range(len(conversation)): # === Pattern 1: user → assistant → user(correction) → assistant(better) === if (i + 2 < len(conversation) and conversation[i][0] == 'user' and conversation[i + 1][0] == 'assistant' and conversation[i + 2][0] == 'user'): user_text = conversation[i + 2][1] # Hard correction correction_result = detect_correction(user_text) if correction_result: label, confidence = correction_result stats['corrections_found'] += 1 pair = build_dpo_pair( prompt_event=conversation[i][2], rejected_event=conversation[i + 1][2], correction_event=conversation[i + 2][2], correction_label=label, correction_confidence=confidence, ) if pair: if (i + 3 < len(conversation) and conversation[i + 3][0] == 'assistant'): chosen_text = conversation[i + 3][1] if len(chosen_text) >= MIN_RESPONSE_LEN: pair['chosen'] = chosen_text stats['dpo_with_chosen'] += 1 stats['dpo_pairs_built'] += 1 dpo_pairs.append(pair) if verbose: print(f"\n 🔴 CORRECTION [{label}] conf={confidence:.0%}", file=sys.stderr) print(f" prompt: {pair['prompt'][:80]}...", file=sys.stderr) continue # Soft redirect redirect_result = detect_soft_redirect(user_text) if redirect_result: label, confidence = redirect_result stats['soft_redirects_found'] += 1 pair = build_dpo_pair( prompt_event=conversation[i][2], rejected_event=conversation[i + 1][2], correction_event=conversation[i + 2][2], correction_label=f'soft_{label}', correction_confidence=confidence, ) if pair: if (i + 3 < len(conversation) and conversation[i + 3][0] == 'assistant'): chosen_text = conversation[i + 3][1] if len(chosen_text) >= MIN_RESPONSE_LEN: pair['chosen'] = chosen_text stats['soft_redirect_pairs_built'] += 1 soft_redirect_pairs.append(pair) if verbose: print(f"\n 🟡 REDIRECT [{label}] conf={confidence:.0%}", file=sys.stderr) print(f" redirect: {clean_user_text(user_text)[:80]}...", file=sys.stderr) continue # Positive reinforcement: user praises → previous exchange is good positive_result = detect_positive(user_text) if positive_result: label, confidence = positive_result stats['positives_found'] += 1 prompt_text = clean_user_text(conversation[i][1]) response_text = conversation[i + 1][1] if len(prompt_text) >= MIN_PROMPT_LEN and len(response_text) >= MIN_RESPONSE_LEN: sft_pairs.append({ 'prompt': prompt_text, 'response': response_text, 'signal_type': 'positive_reinforcement', 'signal_label': label, 'confidence': confidence, 'metadata': { 'session': sid, 'prompt_seq': conversation[i][2].get('seq'), 'response_seq': conversation[i + 1][2].get('seq'), 'timestamp': conversation[i + 2][2].get('timestamp'), 'praise_text': clean_user_text(user_text)[:200], } }) stats['sft_pairs_built'] += 1 if verbose: print(f"\n 🟢 POSITIVE [{label}] conf={confidence:.0%}", file=sys.stderr) print(f" prompt: {prompt_text[:80]}...", file=sys.stderr) continue # Teaching moment: user teaches something teaching_result = detect_teaching(user_text) if teaching_result: label, confidence = teaching_result stats['teachings_found'] += 1 cleaned_teaching = clean_user_text(user_text) # Context is what came before (the assistant response that triggered teaching) context = conversation[i + 1][1] if conversation[i + 1][0] == 'assistant' else '' if len(cleaned_teaching) >= MIN_TEACHING_LEN: teaching_pairs.append({ 'lesson': cleaned_teaching, 'context': context[:500] if context else '', 'signal_type': 'teaching', 'signal_label': label, 'confidence': confidence, 'metadata': { 'session': sid, 'seq': conversation[i + 2][2].get('seq'), 'timestamp': conversation[i + 2][2].get('timestamp'), } }) stats['teaching_pairs_built'] += 1 if verbose: print(f"\n 📚 TEACHING [{label}] conf={confidence:.0%}", file=sys.stderr) print(f" lesson: {cleaned_teaching[:80]}...", file=sys.stderr) return { 'dpo_pairs': dpo_pairs, 'soft_redirect_pairs': soft_redirect_pairs, 'sft_pairs': sft_pairs, 'teaching_pairs': teaching_pairs, 'stats': stats, } # Keep backward-compatible function def extract_dpo_pairs(events: list[dict], verbose: bool = False) -> tuple[list[dict], dict]: """Extract DPO pairs (backward compatible wrapper).""" result = extract_all_signals(events, verbose=verbose) all_dpo = result['dpo_pairs'] + result['soft_redirect_pairs'] stats = result['stats'] # Map to old stats format old_stats = { 'sessions': stats['sessions'], 'corrections_found': stats['corrections_found'] + stats['soft_redirects_found'], 'pairs_built': stats['dpo_pairs_built'] + stats['soft_redirect_pairs_built'], 'pairs_with_chosen': stats['dpo_with_chosen'], } return all_dpo, old_stats def to_dpo_training_format(pairs: list[dict]) -> list[dict]: """Convert to standard DPO training format for trl.DPOTrainer. Only includes pairs that have both chosen and rejected responses. """ training_pairs = [] for pair in pairs: if not pair.get('chosen'): continue training_pairs.append({ 'prompt': pair['prompt'], 'chosen': pair['chosen'], 'rejected': pair['rejected'], }) return training_pairs def to_detailed_format(pairs: list[dict]) -> list[dict]: """Full format with metadata for inspection and debugging.""" return [{ 'prompt': p['prompt'], 'chosen': p.get('chosen', ''), 'rejected': p['rejected'], 'correction': p['correction'], 'has_chosen': bool(p.get('chosen')), **p['metadata'], } for p in pairs] # --- CLI --- def parse_duration(s: str) -> timedelta: """Parse '7d', '24h', '30m' to timedelta.""" m = re.match(r'^(\d+)([dhm])$', s.lower()) if not m: raise ValueError(f"Invalid duration: {s}") v, u = int(m.group(1)), m.group(2) return {'d': timedelta(days=v), 'h': timedelta(hours=v), 'm': timedelta(minutes=v)}[u] def main(): parser = argparse.ArgumentParser( description='Extract DPO preference pairs from NATS event store', ) parser.add_argument('--since', default='7d', help='Time window (e.g. 7d, 24h)') parser.add_argument('--output', '-o', help='Output file (default: auto)') parser.add_argument('--format', choices=['training', 'detailed', 'both'], default='both', help='Output format') parser.add_argument('--min-confidence', type=float, default=0.7, help='Minimum correction confidence (0-1)') parser.add_argument('--dry-run', action='store_true', help='Show stats only, no output') parser.add_argument('--verbose', '-v', action='store_true', help='Show each found pair') parser.add_argument('--sessions-dir', default=None, help='Path to OpenClaw session JSONL dir (default: ~/.openclaw/agents/main/sessions)') parser.add_argument('--source', choices=['sessions', 'nats', 'auto'], default='auto', help='Data source: session transcripts (preferred) or NATS events') args = parser.parse_args() print("🔍 DPO Preference Pair Extractor", file=sys.stderr) duration = parse_duration(args.since) hours = duration.total_seconds() / 3600 # Determine data source sessions_dir = args.sessions_dir or os.path.expanduser('~/.openclaw/agents/main/sessions') use_sessions = args.source == 'sessions' or ( args.source == 'auto' and os.path.isdir(sessions_dir) ) if use_sessions: print(f" Source: Session transcripts ({sessions_dir})", file=sys.stderr) conv_events = load_session_transcripts(sessions_dir, since_hours=int(hours)) else: print(f" Source: NATS event store", file=sys.stderr) info = get_stream_info() last_seq = info['state']['last_seq'] first_seq = info['state']['first_seq'] estimated_events = int(hours * 50) start_seq = max(first_seq, last_seq - estimated_events) print(f" Scanning sequences {start_seq}-{last_seq}", file=sys.stderr) events = fetch_events_by_sequence(start_seq, last_seq) conv_events = [e for e in events if e.get('type', '').startswith('conversation.message')] print(f" {len(conv_events)} conversation events out of {len(events)} total", file=sys.stderr) # Extract ALL signals result = extract_all_signals(conv_events, verbose=args.verbose) stats = result['stats'] # Filter by confidence dpo_pairs = [p for p in result['dpo_pairs'] if p['metadata']['confidence'] >= args.min_confidence] soft_pairs = [p for p in result['soft_redirect_pairs'] if p['metadata']['confidence'] >= args.min_confidence] sft_pairs = [p for p in result['sft_pairs'] if p['confidence'] >= args.min_confidence] teaching_pairs = [p for p in result['teaching_pairs'] if p['confidence'] >= args.min_confidence] # Stats print(f"\n📊 Results:", file=sys.stderr) print(f" Sessions scanned: {stats['sessions']}", file=sys.stderr) print(f"", file=sys.stderr) print(f" 🔴 Hard corrections: {stats['corrections_found']:3d} detected → " f"{len(dpo_pairs)} pairs ({stats['dpo_with_chosen']} with chosen)", file=sys.stderr) print(f" 🟡 Soft redirects: {stats['soft_redirects_found']:3d} detected → " f"{len(soft_pairs)} pairs", file=sys.stderr) print(f" 🟢 Positive signals: {stats['positives_found']:3d} detected → " f"{len(sft_pairs)} SFT pairs", file=sys.stderr) print(f" 📚 Teaching moments: {stats['teachings_found']:3d} detected → " f"{len(teaching_pairs)} pairs", file=sys.stderr) total = len(dpo_pairs) + len(soft_pairs) + len(sft_pairs) + len(teaching_pairs) print(f"\n 📦 Total training signal: {total} pairs", file=sys.stderr) if args.dry_run: for label, pairs_list, emoji in [ ('DPO (hard)', dpo_pairs, '🔴'), ('DPO (soft)', soft_pairs, '🟡'), ('SFT (positive)', sft_pairs, '🟢'), ('Teaching', teaching_pairs, '📚'), ]: if pairs_list: print(f"\n{emoji} {label} — sample:", file=sys.stderr) for p in pairs_list[:3]: if 'prompt' in p: print(f" prompt: {p['prompt'][:100]}", file=sys.stderr) if 'correction' in p: print(f" correction: {p['correction'][:100]}", file=sys.stderr) if 'response' in p: print(f" response: {p['response'][:100]}", file=sys.stderr) if 'lesson' in p: print(f" lesson: {p['lesson'][:100]}", file=sys.stderr) if p.get('metadata', {}).get('praise_text'): print(f" praise: {p['metadata']['praise_text'][:80]}", file=sys.stderr) print(f" ---", file=sys.stderr) return # Output output_dir = Path(os.environ.get('CLAWD_DIR', Path.home() / 'clawd')) / 'training-data' output_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime('%Y-%m-%d') all_dpo = dpo_pairs + soft_pairs if args.format in ('training', 'both'): # DPO training format training_data = to_dpo_training_format(all_dpo) path = Path(args.output) if args.output else output_dir / f'dpo-training-{timestamp}.json' path.write_text(json.dumps(training_data, indent=2, ensure_ascii=False)) print(f"\n✅ DPO training: {path} ({len(training_data)} pairs)", file=sys.stderr) # SFT training format (positive reinforcement) sft_data = [{'instruction': p['prompt'], 'input': '', 'output': p['response']} for p in sft_pairs] sft_path = output_dir / f'sft-positive-{timestamp}.json' sft_path.write_text(json.dumps(sft_data, indent=2, ensure_ascii=False)) print(f"✅ SFT positive: {sft_path} ({len(sft_data)} pairs)", file=sys.stderr) # Teaching pairs teach_data = [{'lesson': p['lesson'], 'context': p.get('context', ''), 'label': p['signal_label']} for p in teaching_pairs] teach_path = output_dir / f'teaching-{timestamp}.json' teach_path.write_text(json.dumps(teach_data, indent=2, ensure_ascii=False)) print(f"✅ Teaching: {teach_path} ({len(teach_data)} pairs)", file=sys.stderr) if args.format in ('detailed', 'both'): detailed_data = { 'dpo_pairs': to_detailed_format(all_dpo), 'sft_pairs': sft_pairs, 'teaching_pairs': teaching_pairs, 'stats': stats, 'extracted_at': datetime.now().isoformat(), } path = output_dir / f'signals-detailed-{timestamp}.json' path.write_text(json.dumps(detailed_data, indent=2, ensure_ascii=False)) print(f"✅ Detailed: {path}", file=sys.stderr) if __name__ == '__main__': main()