#!/usr/bin/env python3 """Cortex Init — first-time setup. Usage: cortex init [--home ~/.cortex] [--enable-all] [--non-interactive] """ import argparse import json import os import sys from pathlib import Path from cortex.config import cortex_home, memory_dir, config_path, logs_dir from cortex.scheduler import JOBS, enable_job def _prompt(msg: str, default: str = "y") -> bool: """Simple y/n prompt.""" suffix = " [Y/n] " if default == "y" else " [y/N] " try: answer = input(msg + suffix).strip().lower() except (EOFError, KeyboardInterrupt): print() return default == "y" if not answer: return default == "y" return answer in ("y", "yes", "ja", "j") def init(home: Path | None = None, enable_all: bool = False, non_interactive: bool = False) -> bool: """Initialize cortex workspace.""" base = home or cortex_home() mem = memory_dir() logs = logs_dir() cfg = config_path() print(f"🧠 Cortex Init") print(f" Home: {base}") print(f" Memory: {mem}") print(f" Config: {cfg}") print() # Create directories for d in [base, mem, mem / "archive", logs]: d.mkdir(parents=True, exist_ok=True) print(f" 📁 {d}") # Create default config if not exists if not cfg.exists(): default_config = { "version": "0.1.0", "permanent_files": ["README.md"], "sessions_dir": "~/.openclaw/agents/main/sessions", } cfg.write_text(json.dumps(default_config, indent=2) + "\n") print(f" 📄 {cfg} (created)") else: print(f" 📄 {cfg} (exists)") # Create default memory README readme = mem / "README.md" if not readme.exists(): readme.write_text("# Memory\n\nCortex memory directory. Files here are managed by `cortex hygiene`.\n") print(f" 📄 {readme} (created)") print() # Smoke test print(" 🔍 Smoke test...") try: from cortex.triage import score_task r = score_task("test task") print(f" ✅ Triage: priority={r.priority:.2f}") except Exception as e: print(f" ❌ Triage failed: {e}") try: from cortex.memory_hygiene import gather_stats s = gather_stats() print(f" ✅ Hygiene: {s['total_files']} files, {s['total_size_human']}") except Exception as e: print(f" ❌ Hygiene failed: {e}") print() # Schedule jobs if enable_all: jobs_to_enable = list(JOBS.keys()) elif non_interactive: jobs_to_enable = [] else: print(" 📅 Schedule periodic jobs?") jobs_to_enable = [] for name, jdef in JOBS.items(): interval = jdef["default_interval_min"] if _prompt(f" Enable {name} (every {interval}min)?"): jobs_to_enable.append(name) for job in jobs_to_enable: ok = enable_job(job) if ok: print(f" ✅ {job} scheduled") else: print(f" ❌ {job} failed to schedule") print() print(" 🧠 Cortex ready!") if jobs_to_enable: print(f" Run 'cortex schedule status' to verify jobs.") print(f" Run 'cortex --help' for all commands.") return True def main(): parser = argparse.ArgumentParser(description="Initialize Cortex") parser.add_argument("--home", type=Path, help="Cortex home directory") parser.add_argument("--enable-all", action="store_true", help="Enable all scheduled jobs") parser.add_argument("--non-interactive", action="store_true", help="Skip interactive prompts") args = parser.parse_args() init(home=args.home, enable_all=args.enable_all, non_interactive=args.non_interactive) if __name__ == "__main__": main()