#!/usr/bin/env python3 """Sub-Agent Output Validator — post-completion validation of sub-agent work. Validates that spawned tasks actually produced what was requested: files created, non-empty, syntax-valid, tests passed, errors addressed. Usage: python3 validate_output.py --transcript /path/to/session.jsonl --task "description" python3 validate_output.py --session-key "agent:main:subagent:xxx" """ import argparse import ast import json import os import re import subprocess import sys from pathlib import Path from typing import Any DEFAULT_SESSIONS_DIR = Path(os.path.expanduser("~/.openclaw/agents/main/sessions")) def parse_jsonl(path: Path) -> list[dict]: entries = [] with open(path) as f: for line in f: line = line.strip() if not line: continue try: entries.append(json.loads(line)) except json.JSONDecodeError: pass return entries def find_session_by_key(session_key: str) -> Path | None: """Find a session JSONL by its session key (searches session headers).""" sessions_dir = DEFAULT_SESSIONS_DIR if not sessions_dir.exists(): return None # session key format: agent:main:subagent:UUID — extract UUID parts = session_key.split(":") uuid_part = parts[-1] if parts else session_key candidate = sessions_dir / f"{uuid_part}.jsonl" if candidate.exists(): return candidate # Search all files for f in sessions_dir.glob("*.jsonl"): try: with open(f) as fh: first_line = fh.readline().strip() if first_line: header = json.loads(first_line) if header.get("id") == uuid_part or session_key in json.dumps(header): return f except (json.JSONDecodeError, OSError): pass return None def extract_mentioned_files(task: str) -> list[str]: """Extract file paths/names mentioned in a task description.""" patterns = [ r'`([^`]+\.\w{1,5})`', # `filename.ext` r'(\S+\.(?:py|ts|js|json|md|yaml|yml|toml|sh))', # bare filenames ] files = set() for pat in patterns: for m in re.finditer(pat, task): f = m.group(1) if "/" not in f and not f.startswith("."): files.add(f) elif f.startswith("~/") or f.startswith("./") or f.startswith("/"): files.add(f) return list(files) def extract_created_files_from_transcript(entries: list[dict]) -> list[str]: """Extract files that were written/created during the session.""" files = set() for entry in entries: msg = entry.get("message", {}) content = msg.get("content", []) if isinstance(content, list): for item in content: if not isinstance(item, dict): continue # Tool calls to write/edit if item.get("type") == "toolCall": name = item.get("name", "") args = item.get("arguments", {}) if isinstance(args, str): try: args = json.loads(args) except json.JSONDecodeError: args = {} if name in ("write", "Write"): fp = args.get("file_path") or args.get("path", "") if fp: files.add(fp) elif name in ("edit", "Edit"): fp = args.get("file_path") or args.get("path", "") if fp: files.add(fp) # Also check exec commands for file creation if isinstance(content, list): for item in content: if isinstance(item, dict) and item.get("type") == "toolCall": if item.get("name") in ("exec", "Exec"): args = item.get("arguments", {}) if isinstance(args, str): try: args = json.loads(args) except json.JSONDecodeError: args = {} cmd = args.get("command", "") # Detect mkdir, touch, tee, etc. for m in re.finditer(r'(?:tee|>)\s+(\S+)', cmd): files.add(m.group(1)) return list(files) def extract_transcript_errors(entries: list[dict]) -> list[str]: """Extract unaddressed error messages from transcript.""" errors = [] for entry in entries: msg = entry.get("message", {}) if msg.get("isError"): content = msg.get("content", []) if isinstance(content, list): for item in content: if isinstance(item, dict): errors.append(item.get("text", "unknown error")[:200]) elif isinstance(content, str): errors.append(content[:200]) return errors def check_test_results(entries: list[dict]) -> dict: """Scan transcript for test execution results.""" result = {"tests_found": False, "tests_passed": None, "details": ""} for entry in entries: msg = entry.get("message", {}) content = msg.get("content", []) if isinstance(content, list): for item in content: text = "" if isinstance(item, dict): text = item.get("text", "") if not text: continue # pytest output if re.search(r'\d+ passed', text): result["tests_found"] = True if "failed" in text.lower(): result["tests_passed"] = False result["details"] = text[:200] else: result["tests_passed"] = True result["details"] = text[:200] # unittest output if re.search(r'OK\s*$', text.strip()): result["tests_found"] = True result["tests_passed"] = True if re.search(r'FAILED', text): result["tests_found"] = True result["tests_passed"] = False result["details"] = text[:200] return result class ValidationReport: def __init__(self): self.checks: list[dict] = [] self.passed = 0 self.failed = 0 self.warnings = 0 def add(self, name: str, status: str, detail: str = ""): """status: pass, fail, warn""" self.checks.append({"name": name, "status": status, "detail": detail}) if status == "pass": self.passed += 1 elif status == "fail": self.failed += 1 else: self.warnings += 1 @property def ok(self) -> bool: return self.failed == 0 def to_json(self) -> dict: return { "passed": self.passed, "failed": self.failed, "warnings": self.warnings, "ok": self.ok, "checks": self.checks, } def to_human(self) -> str: lines = [f"Validation Report: {'✅ PASS' if self.ok else '❌ FAIL'}", f" {self.passed} passed, {self.failed} failed, {self.warnings} warnings", ""] for c in self.checks: icon = {"pass": "✅", "fail": "❌", "warn": "⚠️"}.get(c["status"], "?") line = f" {icon} {c['name']}" if c["detail"]: line += f" — {c['detail']}" lines.append(line) return "\n".join(lines) def validate_file_exists(path: str) -> tuple[bool, str]: """Check if file exists and is non-empty.""" p = Path(os.path.expanduser(path)) if not p.exists(): return False, f"File not found: {path}" if p.stat().st_size == 0: return False, f"File is empty: {path}" return True, f"Exists, {p.stat().st_size} bytes" def validate_python_syntax(path: str) -> tuple[bool, str]: """Check Python file for syntax errors.""" p = Path(os.path.expanduser(path)) if not p.exists(): return False, "File not found" try: source = p.read_text() compile(source, str(p), "exec") return True, "Syntax OK" except SyntaxError as e: return False, f"Syntax error: {e}" def validate_typescript_structure(path: str) -> tuple[bool, str]: """Basic TypeScript validation — check for exports, no excessive 'any'.""" p = Path(os.path.expanduser(path)) if not p.exists(): return False, "File not found" content = p.read_text() issues = [] any_count = len(re.findall(r'\bany\b', content)) if any_count > 10: issues.append(f"Excessive 'any' usage ({any_count} occurrences)") if not re.search(r'\bexport\b', content): issues.append("No exports found") if issues: return False, "; ".join(issues) return True, "Structure OK" def validate_staging_location(files: list[str], staging_dir: str = "staging") -> list[tuple[str, bool, str]]: """Check if files are in the staging directory.""" results = [] for f in files: expanded = os.path.expanduser(f) in_staging = staging_dir in expanded results.append((f, in_staging, "In staging" if in_staging else "Not in staging dir")) return results def run_validation(transcript_path: Path, task: str) -> ValidationReport: """Main validation logic.""" report = ValidationReport() entries = parse_jsonl(transcript_path) if not entries: report.add("Transcript readable", "fail", "No entries found") return report report.add("Transcript readable", "pass", f"{len(entries)} entries") # Check mentioned files exist mentioned = extract_mentioned_files(task) created = extract_created_files_from_transcript(entries) all_files = list(set(mentioned + created)) for f in all_files: exists, detail = validate_file_exists(f) report.add(f"File exists: {Path(f).name}", "pass" if exists else "fail", detail) # Syntax check Python files for f in all_files: if f.endswith(".py"): ok, detail = validate_python_syntax(f) report.add(f"Python syntax: {Path(f).name}", "pass" if ok else "fail", detail) # Syntax check TypeScript files for f in all_files: if f.endswith(".ts") or f.endswith(".tsx"): ok, detail = validate_typescript_structure(f) report.add(f"TS structure: {Path(f).name}", "pass" if ok else "fail", detail) # Check staging location if mentioned in task if "staging" in task.lower(): for f in all_files: expanded = os.path.expanduser(f) in_staging = "staging" in expanded report.add(f"In staging: {Path(f).name}", "pass" if in_staging else "warn", "In staging dir" if in_staging else "Not in staging dir") # Check test results if tests were requested if any(w in task.lower() for w in ["test", "tests", "pytest", "unittest"]): test_results = check_test_results(entries) if test_results["tests_found"]: if test_results["tests_passed"]: report.add("Tests passed", "pass", test_results["details"][:100]) else: report.add("Tests passed", "fail", test_results["details"][:100]) else: report.add("Tests executed", "warn", "No test output found in transcript") # Check for unaddressed errors errors = extract_transcript_errors(entries) if errors: report.add("Unaddressed errors", "warn", f"{len(errors)} error(s) in transcript") else: report.add("No unaddressed errors", "pass") return report def main(): parser = argparse.ArgumentParser(description="Sub-Agent Output Validator") parser.add_argument("--transcript", help="Path to session JSONL file") parser.add_argument("--session-key", help="Session key (e.g. agent:main:subagent:xxx)") parser.add_argument("--task", default="", help="Original task description") parser.add_argument("--json", action="store_true", help="Output JSON only") args = parser.parse_args() if not args.transcript and not args.session_key: parser.error("Either --transcript or --session-key is required") if args.session_key: path = find_session_by_key(args.session_key) if not path: print(f"Session not found: {args.session_key}", file=sys.stderr) sys.exit(1) else: path = Path(args.transcript) if not path.exists(): print(f"File not found: {args.transcript}", file=sys.stderr) sys.exit(1) report = run_validation(path, args.task) if args.json: print(json.dumps(report.to_json(), indent=2)) else: print(report.to_human()) print() print(json.dumps(report.to_json(), indent=2)) sys.exit(0 if report.ok else 1) if __name__ == "__main__": main()