All checks were successful
Tests / test (push) Successful in 2s
- cortex init: creates workspace, smoke tests, optional job setup - cortex schedule list|status|enable|disable|logs - Linux: systemd user timers - macOS: launchd plists - Jobs: feedback (6h), hygiene (daily), health (30min) - --interval flag for custom intervals - Replaces external OpenClaw cron jobs - 169/169 tests green
129 lines
3.8 KiB
Python
129 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
"""Cortex Init — first-time setup.
|
|
|
|
Usage:
|
|
cortex init [--home ~/.cortex] [--enable-all] [--non-interactive]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from cortex.config import cortex_home, memory_dir, config_path, logs_dir
|
|
from cortex.scheduler import JOBS, enable_job
|
|
|
|
|
|
def _prompt(msg: str, default: str = "y") -> bool:
|
|
"""Simple y/n prompt."""
|
|
suffix = " [Y/n] " if default == "y" else " [y/N] "
|
|
try:
|
|
answer = input(msg + suffix).strip().lower()
|
|
except (EOFError, KeyboardInterrupt):
|
|
print()
|
|
return default == "y"
|
|
if not answer:
|
|
return default == "y"
|
|
return answer in ("y", "yes", "ja", "j")
|
|
|
|
|
|
def init(home: Path | None = None, enable_all: bool = False,
|
|
non_interactive: bool = False) -> bool:
|
|
"""Initialize cortex workspace."""
|
|
base = home or cortex_home()
|
|
mem = memory_dir()
|
|
logs = logs_dir()
|
|
cfg = config_path()
|
|
|
|
print(f"🧠 Cortex Init")
|
|
print(f" Home: {base}")
|
|
print(f" Memory: {mem}")
|
|
print(f" Config: {cfg}")
|
|
print()
|
|
|
|
# Create directories
|
|
for d in [base, mem, mem / "archive", logs]:
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
print(f" 📁 {d}")
|
|
|
|
# Create default config if not exists
|
|
if not cfg.exists():
|
|
default_config = {
|
|
"version": "0.1.0",
|
|
"permanent_files": ["README.md"],
|
|
"sessions_dir": "~/.openclaw/agents/main/sessions",
|
|
}
|
|
cfg.write_text(json.dumps(default_config, indent=2) + "\n")
|
|
print(f" 📄 {cfg} (created)")
|
|
else:
|
|
print(f" 📄 {cfg} (exists)")
|
|
|
|
# Create default memory README
|
|
readme = mem / "README.md"
|
|
if not readme.exists():
|
|
readme.write_text("# Memory\n\nCortex memory directory. Files here are managed by `cortex hygiene`.\n")
|
|
print(f" 📄 {readme} (created)")
|
|
|
|
print()
|
|
|
|
# Smoke test
|
|
print(" 🔍 Smoke test...")
|
|
try:
|
|
from cortex.triage import score_task
|
|
r = score_task("test task")
|
|
print(f" ✅ Triage: priority={r.priority:.2f}")
|
|
except Exception as e:
|
|
print(f" ❌ Triage failed: {e}")
|
|
|
|
try:
|
|
from cortex.memory_hygiene import gather_stats
|
|
s = gather_stats()
|
|
print(f" ✅ Hygiene: {s['total_files']} files, {s['total_size_human']}")
|
|
except Exception as e:
|
|
print(f" ❌ Hygiene failed: {e}")
|
|
|
|
print()
|
|
|
|
# Schedule jobs
|
|
if enable_all:
|
|
jobs_to_enable = list(JOBS.keys())
|
|
elif non_interactive:
|
|
jobs_to_enable = []
|
|
else:
|
|
print(" 📅 Schedule periodic jobs?")
|
|
jobs_to_enable = []
|
|
for name, jdef in JOBS.items():
|
|
interval = jdef["default_interval_min"]
|
|
if _prompt(f" Enable {name} (every {interval}min)?"):
|
|
jobs_to_enable.append(name)
|
|
|
|
for job in jobs_to_enable:
|
|
ok = enable_job(job)
|
|
if ok:
|
|
print(f" ✅ {job} scheduled")
|
|
else:
|
|
print(f" ❌ {job} failed to schedule")
|
|
|
|
print()
|
|
print(" 🧠 Cortex ready!")
|
|
if jobs_to_enable:
|
|
print(f" Run 'cortex schedule status' to verify jobs.")
|
|
print(f" Run 'cortex --help' for all commands.")
|
|
return True
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Initialize Cortex")
|
|
parser.add_argument("--home", type=Path, help="Cortex home directory")
|
|
parser.add_argument("--enable-all", action="store_true",
|
|
help="Enable all scheduled jobs")
|
|
parser.add_argument("--non-interactive", action="store_true",
|
|
help="Skip interactive prompts")
|
|
args = parser.parse_args()
|
|
init(home=args.home, enable_all=args.enable_all,
|
|
non_interactive=args.non_interactive)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|