darkplex-core/cortex/feedback_loop.py
Claudia 734f96cfcf
All checks were successful
Tests / test (push) Successful in 2s
refactor: remove all hardcoded paths, use env vars + config
All ~/clawd/ references replaced with configurable paths:
- CORTEX_HOME (default: ~/.cortex)
- CORTEX_MEMORY_DIR, CORTEX_CONFIG, CORTEX_GROWTH_LOG, CORTEX_ROADMAP
- permanent_files configurable via config.json
- Tests pass both with and without env vars set
- 169/169 tests green
2026-02-09 12:13:18 +01:00

393 lines
14 KiB
Python

#!/usr/bin/env python3
"""Feedback Loop Engine — automatically extract lessons from session transcripts.
Scans OpenClaw session JSONL files, detects patterns (corrections, retries,
tool failures, knowledge gaps, self-acknowledged errors), and appends
findings to growth-log.md.
Usage:
python3 feedback_loop.py --since 24h
python3 feedback_loop.py --since 7d --dry-run
python3 feedback_loop.py --session <session-file>
"""
import argparse
import json
import os
import re
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
DEFAULT_CONFIG_PATH = Path(__file__).parent / "config.json"
def load_config(path: Path | None = None) -> dict:
p = path or DEFAULT_CONFIG_PATH
if p.exists():
with open(p) as f:
return json.load(f)
return {}
def expand(p: str) -> Path:
return Path(os.path.expanduser(p))
def parse_since(since: str) -> datetime:
"""Parse '24h', '7d', '2h', etc. into a UTC datetime."""
m = re.match(r"^(\d+)([hdm])$", since.strip())
if not m:
raise ValueError(f"Invalid --since format: {since!r}. Use e.g. 24h, 7d, 30m")
val, unit = int(m.group(1)), m.group(2)
delta = {"h": timedelta(hours=val), "d": timedelta(days=val), "m": timedelta(minutes=val)}[unit]
return datetime.now(timezone.utc) - delta
def parse_jsonl(path: Path) -> list[dict]:
"""Parse a JSONL file, skipping malformed lines."""
entries = []
with open(path) as f:
for i, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
pass
return entries
def get_session_timestamp(entries: list[dict]) -> datetime | None:
"""Extract session start timestamp."""
for e in entries:
ts = e.get("timestamp")
if ts:
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except (ValueError, TypeError):
pass
return None
def get_text_content(message: dict) -> str:
"""Extract text from a message entry."""
msg = message.get("message", {})
content = msg.get("content", "")
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
parts.append(item.get("text", ""))
return " ".join(parts)
return ""
def get_tool_name(message: dict) -> str | None:
"""Extract tool name from a toolCall content item."""
msg = message.get("message", {})
content = msg.get("content", [])
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "toolCall":
return item.get("name")
return None
def get_tool_result_error(entry: dict) -> str | None:
"""Check if a toolResult entry contains an error."""
msg = entry.get("message", {})
if msg.get("isError"):
content = msg.get("content", [])
if isinstance(content, list):
for item in content:
if isinstance(item, dict):
return item.get("text", "error")
return "error"
return None
# --- Detection Heuristics ---
class Finding:
def __init__(self, category: str, trigger: str, lines: tuple[int, int],
session_file: str, confidence: float = 1.0):
self.category = category
self.trigger = trigger
self.lines = lines
self.session_file = session_file
self.confidence = confidence
def detect_corrections(entries: list[dict], config: dict, session_file: str) -> list[Finding]:
"""Detect user corrections after agent actions."""
keywords = config.get("detection", {}).get("correction_keywords",
["nein", "falsch", "wrong", "nicht", "stop"])
findings = []
prev_was_assistant = False
prev_line = 0
for i, entry in enumerate(entries):
msg = entry.get("message", {})
role = msg.get("role")
if role == "assistant":
prev_was_assistant = True
prev_line = i
continue
if role == "user" and prev_was_assistant:
text = get_text_content(entry).lower()
for kw in keywords:
if re.search(r'\b' + re.escape(kw) + r'\b', text):
findings.append(Finding(
category="correction",
trigger=f"User said '{kw}' after agent response",
lines=(prev_line + 1, i + 1),
session_file=session_file,
confidence=0.7,
))
break
if role == "user":
prev_was_assistant = False
return findings
def detect_retries(entries: list[dict], config: dict, session_file: str) -> list[Finding]:
"""Detect same tool called N+ times in sequence."""
threshold = config.get("detection", {}).get("retry_threshold", 3)
findings = []
current_tool = None
count = 0
start_line = 0
for i, entry in enumerate(entries):
tool = get_tool_name(entry)
if tool:
if tool == current_tool:
count += 1
else:
if current_tool and count >= threshold:
findings.append(Finding(
category="retry",
trigger=f"Tool '{current_tool}' called {count} times in sequence",
lines=(start_line + 1, i),
session_file=session_file,
))
current_tool = tool
count = 1
start_line = i
# Final check
if current_tool and count >= threshold:
findings.append(Finding(
category="retry",
trigger=f"Tool '{current_tool}' called {count} times in sequence",
lines=(start_line + 1, len(entries)),
session_file=session_file,
))
return findings
def detect_tool_failures(entries: list[dict], config: dict, session_file: str) -> list[Finding]:
"""Detect tool result errors."""
error_patterns = config.get("detection", {}).get("error_patterns",
["error", "Error", "failed", "Failed"])
findings = []
for i, entry in enumerate(entries):
if entry.get("type") == "message":
msg = entry.get("message", {})
# Explicit isError
err = get_tool_result_error(entry)
if err:
findings.append(Finding(
category="tool_failure",
trigger=f"Tool returned error: {err[:120]}",
lines=(i + 1, i + 1),
session_file=session_file,
))
continue
# Check tool result content for error patterns
if msg.get("role") == "toolResult":
content = msg.get("content", [])
if isinstance(content, list):
for item in content:
text = item.get("text", "") if isinstance(item, dict) else ""
for pat in error_patterns:
if pat in text:
findings.append(Finding(
category="tool_failure",
trigger=f"Tool result contains '{pat}': {text[:100]}",
lines=(i + 1, i + 1),
session_file=session_file,
confidence=0.5,
))
break
else:
continue
break
return findings
def detect_self_errors(entries: list[dict], config: dict, session_file: str) -> list[Finding]:
"""Detect agent self-acknowledged errors."""
keywords = config.get("detection", {}).get("apology_keywords",
["sorry", "entschuldigung", "my mistake"])
findings = []
for i, entry in enumerate(entries):
msg = entry.get("message", {})
if msg.get("role") == "assistant":
text = get_text_content(entry).lower()
for kw in keywords:
if kw.lower() in text:
findings.append(Finding(
category="self_error",
trigger=f"Agent acknowledged error with '{kw}'",
lines=(i + 1, i + 1),
session_file=session_file,
confidence=0.6,
))
break
return findings
def detect_knowledge_gaps(entries: list[dict], config: dict, session_file: str) -> list[Finding]:
"""Detect when user provides the answer after agent couldn't find it."""
findings = []
gap_phrases = ["i don't know", "ich weiß nicht", "couldn't find", "konnte nicht finden",
"unable to", "no results", "not found", "keine ergebnisse"]
for i, entry in enumerate(entries):
msg = entry.get("message", {})
if msg.get("role") == "assistant":
text = get_text_content(entry).lower()
has_gap = any(p in text for p in gap_phrases)
if has_gap:
# Check if next user message provides info
for j in range(i + 1, min(i + 4, len(entries))):
next_msg = entries[j].get("message", {})
if next_msg.get("role") == "user":
user_text = get_text_content(entries[j])
if len(user_text) > 20: # Substantial answer
findings.append(Finding(
category="knowledge_gap",
trigger=f"Agent couldn't find answer, user provided it",
lines=(i + 1, j + 1),
session_file=session_file,
confidence=0.6,
))
break
return findings
def analyze_session(entries: list[dict], config: dict, session_file: str) -> list[Finding]:
"""Run all detectors on a session."""
min_conf = config.get("detection", {}).get("min_confidence", 0.5)
findings = []
for detector in [detect_corrections, detect_retries, detect_tool_failures,
detect_self_errors, detect_knowledge_gaps]:
findings.extend(detector(entries, config, session_file))
return [f for f in findings if f.confidence >= min_conf]
def finding_to_markdown(finding: Finding, config: dict) -> str:
"""Format a finding as growth-log markdown."""
categories = config.get("categories", {})
cat_label = categories.get(finding.category, finding.category.replace("_", " ").title())
date = datetime.now().strftime("%Y-%m-%d")
session_name = Path(finding.session_file).stem
return f"""### {date} — [Auto-detected: {cat_label}]
- **Trigger:** {finding.trigger}
- **Erkenntnis:** Auto-detected pattern — review recommended
- **Neue Fähigkeit:** Awareness of recurring pattern
- **Source:** {session_name} lines {finding.lines[0]}-{finding.lines[1]}
"""
def deduplicate_findings(findings: list[Finding]) -> list[Finding]:
"""Remove duplicate findings by category + similar trigger."""
seen = set()
unique = []
for f in findings:
key = (f.category, f.trigger[:60])
if key not in seen:
seen.add(key)
unique.append(f)
return unique
def append_to_growth_log(findings: list[Finding], config: dict, dry_run: bool = False) -> str:
"""Append findings to growth-log.md. Returns the text that was/would be appended."""
if not findings:
return ""
text = "\n---\n\n"
for f in findings:
text += finding_to_markdown(f, config) + "\n"
if dry_run:
return text
from cortex.config import growth_log
log_path = expand(config.get("growth_log_path", str(growth_log())))
with open(log_path, "a") as fh:
fh.write(text)
return text
def get_recent_sessions(config: dict, since: datetime) -> list[Path]:
"""Get session files modified since the given time."""
sessions_dir = expand(config.get("sessions_dir", "~/.openclaw/agents/main/sessions"))
if not sessions_dir.exists():
return []
files = []
for f in sessions_dir.glob("*.jsonl"):
mtime = datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc)
if mtime >= since:
files.append(f)
return sorted(files, key=lambda f: f.stat().st_mtime, reverse=True)
def run(since: str = "24h", session: str | None = None, dry_run: bool = False,
config_path: str | None = None) -> list[Finding]:
"""Main entry point."""
config = load_config(Path(config_path) if config_path else None)
if config.get("dry_run"):
dry_run = True
if session:
session_path = Path(session)
if not session_path.exists():
print(f"Session file not found: {session}", file=sys.stderr)
sys.exit(1)
sessions = [session_path]
else:
cutoff = parse_since(since)
sessions = get_recent_sessions(config, cutoff)
all_findings: list[Finding] = []
for s in sessions:
entries = parse_jsonl(s)
findings = analyze_session(entries, config, str(s))
all_findings.extend(findings)
all_findings = deduplicate_findings(all_findings)
output = append_to_growth_log(all_findings, config, dry_run=dry_run)
if output:
if dry_run:
print("=== DRY RUN — would append: ===")
print(output)
else:
print("No patterns detected.")
return all_findings
def main():
parser = argparse.ArgumentParser(description="Feedback Loop Engine")
parser.add_argument("--since", default="24h", help="Time window (e.g. 24h, 7d, 30m)")
parser.add_argument("--session", help="Analyze a specific session file")
parser.add_argument("--dry-run", action="store_true", help="Don't write to growth-log")
parser.add_argument("--config", help="Path to config.json")
args = parser.parse_args()
run(since=args.since, session=args.session, dry_run=args.dry_run, config_path=args.config)
if __name__ == "__main__":
main()