feat: cortex init + schedule — self-managed jobs via systemd/launchd
All checks were successful
Tests / test (push) Successful in 2s

- cortex init: creates workspace, smoke tests, optional job setup
- cortex schedule list|status|enable|disable|logs
- Linux: systemd user timers
- macOS: launchd plists
- Jobs: feedback (6h), hygiene (daily), health (30min)
- --interval flag for custom intervals
- Replaces external OpenClaw cron jobs
- 169/169 tests green
This commit is contained in:
Claudia 2026-02-09 12:19:15 +01:00
parent 734f96cfcf
commit cbd3556a09
3 changed files with 588 additions and 1 deletions

View file

@ -2,6 +2,8 @@
"""Cortex CLI — unified entry point for all intelligence modules.
Usage:
cortex init [--enable-all] [--non-interactive]
cortex schedule list|status|enable|disable|logs <job>
cortex triage score "task description"
cortex health [--json]
cortex feedback --since 6h [--dry-run]
@ -25,7 +27,15 @@ def main():
# Strip the command from argv so sub-modules see clean args
sys.argv = [f"cortex {cmd}"] + sys.argv[2:]
if cmd == "version":
if cmd == "init":
from cortex.init import main as init_main
init_main()
elif cmd == "schedule":
from cortex.scheduler import main as schedule_main
schedule_main()
elif cmd == "version":
from cortex import __version__
print(f"cortex {__version__}")

129
cortex/init.py Normal file
View file

@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Cortex Init — first-time setup.
Usage:
cortex init [--home ~/.cortex] [--enable-all] [--non-interactive]
"""
import argparse
import json
import os
import sys
from pathlib import Path
from cortex.config import cortex_home, memory_dir, config_path, logs_dir
from cortex.scheduler import JOBS, enable_job
def _prompt(msg: str, default: str = "y") -> bool:
"""Simple y/n prompt."""
suffix = " [Y/n] " if default == "y" else " [y/N] "
try:
answer = input(msg + suffix).strip().lower()
except (EOFError, KeyboardInterrupt):
print()
return default == "y"
if not answer:
return default == "y"
return answer in ("y", "yes", "ja", "j")
def init(home: Path | None = None, enable_all: bool = False,
non_interactive: bool = False) -> bool:
"""Initialize cortex workspace."""
base = home or cortex_home()
mem = memory_dir()
logs = logs_dir()
cfg = config_path()
print(f"🧠 Cortex Init")
print(f" Home: {base}")
print(f" Memory: {mem}")
print(f" Config: {cfg}")
print()
# Create directories
for d in [base, mem, mem / "archive", logs]:
d.mkdir(parents=True, exist_ok=True)
print(f" 📁 {d}")
# Create default config if not exists
if not cfg.exists():
default_config = {
"version": "0.1.0",
"permanent_files": ["README.md"],
"sessions_dir": "~/.openclaw/agents/main/sessions",
}
cfg.write_text(json.dumps(default_config, indent=2) + "\n")
print(f" 📄 {cfg} (created)")
else:
print(f" 📄 {cfg} (exists)")
# Create default memory README
readme = mem / "README.md"
if not readme.exists():
readme.write_text("# Memory\n\nCortex memory directory. Files here are managed by `cortex hygiene`.\n")
print(f" 📄 {readme} (created)")
print()
# Smoke test
print(" 🔍 Smoke test...")
try:
from cortex.triage import score_task
r = score_task("test task")
print(f" ✅ Triage: priority={r.priority:.2f}")
except Exception as e:
print(f" ❌ Triage failed: {e}")
try:
from cortex.memory_hygiene import gather_stats
s = gather_stats()
print(f" ✅ Hygiene: {s['total_files']} files, {s['total_size_human']}")
except Exception as e:
print(f" ❌ Hygiene failed: {e}")
print()
# Schedule jobs
if enable_all:
jobs_to_enable = list(JOBS.keys())
elif non_interactive:
jobs_to_enable = []
else:
print(" 📅 Schedule periodic jobs?")
jobs_to_enable = []
for name, jdef in JOBS.items():
interval = jdef["default_interval_min"]
if _prompt(f" Enable {name} (every {interval}min)?"):
jobs_to_enable.append(name)
for job in jobs_to_enable:
ok = enable_job(job)
if ok:
print(f"{job} scheduled")
else:
print(f"{job} failed to schedule")
print()
print(" 🧠 Cortex ready!")
if jobs_to_enable:
print(f" Run 'cortex schedule status' to verify jobs.")
print(f" Run 'cortex --help' for all commands.")
return True
def main():
parser = argparse.ArgumentParser(description="Initialize Cortex")
parser.add_argument("--home", type=Path, help="Cortex home directory")
parser.add_argument("--enable-all", action="store_true",
help="Enable all scheduled jobs")
parser.add_argument("--non-interactive", action="store_true",
help="Skip interactive prompts")
args = parser.parse_args()
init(home=args.home, enable_all=args.enable_all,
non_interactive=args.non_interactive)
if __name__ == "__main__":
main()

448
cortex/scheduler.py Normal file
View file

@ -0,0 +1,448 @@
#!/usr/bin/env python3
"""Cortex Scheduler — manage periodic jobs via systemd (Linux) or launchd (macOS).
Usage:
cortex schedule list
cortex schedule enable <job> [--interval <minutes>]
cortex schedule disable <job>
cortex schedule status
cortex schedule logs <job> [--lines 50]
"""
import argparse
import json
import os
import platform
import shutil
import subprocess
import sys
import textwrap
from pathlib import Path
from typing import Optional
from cortex.config import cortex_home
# --- Job definitions ---
JOBS = {
"feedback": {
"description": "Extract lessons from session transcripts",
"command": "cortex feedback --since {interval}",
"default_interval_min": 360, # 6 hours
"calendar": "*-*-* 0/6:00:00", # systemd OnCalendar
},
"hygiene": {
"description": "Clean stale/duplicate/orphan memory files",
"command": "cortex hygiene duplicates && cortex hygiene stale && cortex hygiene archive",
"default_interval_min": 1440, # daily
"calendar": "*-*-* 04:00:00",
},
"health": {
"description": "Proactive system health scan",
"command": "cortex health --json",
"default_interval_min": 30,
"calendar": "*-*-* *:0/30:00",
},
}
def _is_linux() -> bool:
return platform.system() == "Linux"
def _is_macos() -> bool:
return platform.system() == "Darwin"
def _cortex_bin() -> str:
"""Find the cortex binary path."""
which = shutil.which("cortex")
if which:
return which
# Fallback: python -m cortex.cli
return f"{sys.executable} -m cortex.cli"
def _env_vars() -> dict[str, str]:
"""Collect CORTEX_* env vars to pass to scheduled jobs."""
env = {}
for key in ("CORTEX_HOME", "CORTEX_MEMORY_DIR", "CORTEX_CONFIG",
"CORTEX_GROWTH_LOG", "CORTEX_ROADMAP"):
val = os.environ.get(key)
if val:
env[key] = val
return env
# --- systemd (Linux) ---
def _systemd_dir() -> Path:
return Path.home() / ".config" / "systemd" / "user"
def _systemd_unit_name(job: str) -> str:
return f"cortex-{job}"
def _write_systemd_units(job: str, interval_min: int) -> tuple[Path, Path]:
"""Write .service and .timer files for a job."""
job_def = JOBS[job]
unit = _systemd_unit_name(job)
sdir = _systemd_dir()
sdir.mkdir(parents=True, exist_ok=True)
cortex_bin = _cortex_bin()
env_vars = _env_vars()
env_lines = "\n".join(f"Environment={k}={v}" for k, v in env_vars.items())
# For feedback, convert interval to --since format
command = job_def["command"]
if "{interval}" in command:
hours = max(1, interval_min // 60)
command = command.replace("{interval}", f"{hours}h")
service_path = sdir / f"{unit}.service"
service_path.write_text(textwrap.dedent(f"""\
[Unit]
Description=Cortex {job.title()} {job_def['description']}
[Service]
Type=oneshot
ExecStart=/bin/bash -c '{command}'
{env_lines}
Environment=PATH={os.environ.get('PATH', '/usr/local/bin:/usr/bin:/bin')}
[Install]
WantedBy=default.target
"""))
timer_path = sdir / f"{unit}.timer"
timer_path.write_text(textwrap.dedent(f"""\
[Unit]
Description=Cortex {job.title()} Timer every {interval_min}min
[Timer]
OnBootSec=5min
OnUnitActiveSec={interval_min}min
Persistent=true
[Install]
WantedBy=timers.target
"""))
return service_path, timer_path
def _enable_systemd(job: str, interval_min: int) -> bool:
service_path, timer_path = _write_systemd_units(job, interval_min)
unit = _systemd_unit_name(job)
try:
subprocess.run(["systemctl", "--user", "daemon-reload"], check=True, capture_output=True)
subprocess.run(["systemctl", "--user", "enable", "--now", f"{unit}.timer"],
check=True, capture_output=True)
return True
except subprocess.CalledProcessError as e:
print(f"Error enabling systemd timer: {e.stderr.decode()}", file=sys.stderr)
return False
def _disable_systemd(job: str) -> bool:
unit = _systemd_unit_name(job)
try:
subprocess.run(["systemctl", "--user", "disable", "--now", f"{unit}.timer"],
check=True, capture_output=True)
# Clean up unit files
sdir = _systemd_dir()
for ext in (".service", ".timer"):
f = sdir / f"{unit}{ext}"
if f.exists():
f.unlink()
subprocess.run(["systemctl", "--user", "daemon-reload"], capture_output=True)
return True
except subprocess.CalledProcessError as e:
print(f"Error disabling systemd timer: {e.stderr.decode()}", file=sys.stderr)
return False
def _status_systemd() -> list[dict]:
results = []
for job in JOBS:
unit = _systemd_unit_name(job)
try:
r = subprocess.run(
["systemctl", "--user", "is-active", f"{unit}.timer"],
capture_output=True, text=True
)
active = r.stdout.strip() == "active"
except FileNotFoundError:
active = False
# Get next trigger time
next_run = ""
if active:
try:
r = subprocess.run(
["systemctl", "--user", "show", f"{unit}.timer",
"--property=NextElapseUSecRealtime"],
capture_output=True, text=True
)
next_run = r.stdout.strip().split("=", 1)[-1] if "=" in r.stdout else ""
except Exception:
pass
results.append({
"job": job,
"description": JOBS[job]["description"],
"active": active,
"next_run": next_run,
})
return results
def _logs_systemd(job: str, lines: int = 50) -> str:
unit = _systemd_unit_name(job)
try:
r = subprocess.run(
["journalctl", "--user", "-u", f"{unit}.service",
f"--lines={lines}", "--no-pager"],
capture_output=True, text=True
)
return r.stdout
except FileNotFoundError:
return "journalctl not available"
# --- launchd (macOS) ---
def _launchd_dir() -> Path:
return Path.home() / "Library" / "LaunchAgents"
def _launchd_label(job: str) -> str:
return f"dev.cortex.{job}"
def _write_launchd_plist(job: str, interval_min: int) -> Path:
"""Write a launchd plist for a job."""
job_def = JOBS[job]
label = _launchd_label(job)
pdir = _launchd_dir()
pdir.mkdir(parents=True, exist_ok=True)
cortex_bin = _cortex_bin()
env_vars = _env_vars()
command = job_def["command"]
if "{interval}" in command:
hours = max(1, interval_min // 60)
command = command.replace("{interval}", f"{hours}h")
log_dir = cortex_home() / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
env_xml = ""
all_env = {**env_vars, "PATH": os.environ.get("PATH", "/usr/local/bin:/usr/bin:/bin")}
if all_env:
env_xml = " <key>EnvironmentVariables</key>\n <dict>\n"
for k, v in all_env.items():
env_xml += f" <key>{k}</key>\n <string>{v}</string>\n"
env_xml += " </dict>"
plist_path = pdir / f"{label}.plist"
plist_path.write_text(textwrap.dedent(f"""\
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>{label}</string>
<key>ProgramArguments</key>
<array>
<string>/bin/bash</string>
<string>-c</string>
<string>{command}</string>
</array>
<key>StartInterval</key>
<integer>{interval_min * 60}</integer>
<key>StandardOutPath</key>
<string>{log_dir / f'{job}.log'}</string>
<key>StandardErrorPath</key>
<string>{log_dir / f'{job}.err'}</string>
{env_xml}
</dict>
</plist>
"""))
return plist_path
def _enable_launchd(job: str, interval_min: int) -> bool:
plist_path = _write_launchd_plist(job, interval_min)
label = _launchd_label(job)
try:
# Unload first if already loaded
subprocess.run(["launchctl", "unload", str(plist_path)],
capture_output=True)
subprocess.run(["launchctl", "load", str(plist_path)],
check=True, capture_output=True)
return True
except subprocess.CalledProcessError as e:
print(f"Error loading launchd job: {e.stderr.decode()}", file=sys.stderr)
return False
def _disable_launchd(job: str) -> bool:
label = _launchd_label(job)
plist_path = _launchd_dir() / f"{label}.plist"
try:
subprocess.run(["launchctl", "unload", str(plist_path)],
check=True, capture_output=True)
if plist_path.exists():
plist_path.unlink()
return True
except subprocess.CalledProcessError as e:
print(f"Error unloading launchd job: {e.stderr.decode()}", file=sys.stderr)
return False
def _status_launchd() -> list[dict]:
results = []
for job in JOBS:
label = _launchd_label(job)
try:
r = subprocess.run(
["launchctl", "list", label],
capture_output=True, text=True
)
active = r.returncode == 0
except FileNotFoundError:
active = False
results.append({
"job": job,
"description": JOBS[job]["description"],
"active": active,
"next_run": "",
})
return results
def _logs_launchd(job: str, lines: int = 50) -> str:
log_path = cortex_home() / "logs" / f"{job}.log"
if not log_path.exists():
return f"No log file at {log_path}"
all_lines = log_path.read_text().splitlines()
return "\n".join(all_lines[-lines:])
# --- Unified interface ---
def enable_job(job: str, interval_min: Optional[int] = None) -> bool:
if job not in JOBS:
print(f"Unknown job: {job}. Available: {', '.join(JOBS.keys())}")
return False
interval = interval_min or JOBS[job]["default_interval_min"]
if _is_linux():
return _enable_systemd(job, interval)
elif _is_macos():
return _enable_launchd(job, interval)
else:
print(f"Unsupported platform: {platform.system()}")
return False
def disable_job(job: str) -> bool:
if job not in JOBS:
print(f"Unknown job: {job}. Available: {', '.join(JOBS.keys())}")
return False
if _is_linux():
return _disable_systemd(job)
elif _is_macos():
return _disable_launchd(job)
else:
print(f"Unsupported platform: {platform.system()}")
return False
def status() -> list[dict]:
if _is_linux():
return _status_systemd()
elif _is_macos():
return _status_launchd()
return []
def logs(job: str, lines: int = 50) -> str:
if _is_linux():
return _logs_systemd(job, lines)
elif _is_macos():
return _logs_launchd(job, lines)
return "Unsupported platform"
# --- CLI ---
def main():
parser = argparse.ArgumentParser(description="Cortex Scheduler")
sub = parser.add_subparsers(dest="action")
sub.add_parser("list", help="List available jobs")
sub.add_parser("status", help="Show active job status")
enable_p = sub.add_parser("enable", help="Enable a scheduled job")
enable_p.add_argument("job", choices=JOBS.keys())
enable_p.add_argument("--interval", type=int, help="Interval in minutes")
disable_p = sub.add_parser("disable", help="Disable a scheduled job")
disable_p.add_argument("job", choices=JOBS.keys())
logs_p = sub.add_parser("logs", help="Show job logs")
logs_p.add_argument("job", choices=JOBS.keys())
logs_p.add_argument("--lines", type=int, default=50)
args = parser.parse_args()
if args.action == "list":
print(f"{'Job':<12} {'Interval':<12} {'Description'}")
print("-" * 60)
for name, jdef in JOBS.items():
interval = f"{jdef['default_interval_min']}min"
print(f"{name:<12} {interval:<12} {jdef['description']}")
elif args.action == "status":
results = status()
if not results:
print(f"No scheduler support on {platform.system()}")
return
print(f"{'Job':<12} {'Status':<10} {'Next Run'}")
print("-" * 60)
for r in results:
st = "✅ active" if r["active"] else "⬚ inactive"
print(f"{r['job']:<12} {st:<10} {r['next_run']}")
elif args.action == "enable":
ok = enable_job(args.job, args.interval)
if ok:
interval = args.interval or JOBS[args.job]["default_interval_min"]
print(f"{args.job} enabled (every {interval}min)")
else:
print(f"❌ Failed to enable {args.job}")
sys.exit(1)
elif args.action == "disable":
ok = disable_job(args.job)
if ok:
print(f"{args.job} disabled")
else:
print(f"❌ Failed to disable {args.job}")
sys.exit(1)
elif args.action == "logs":
print(logs(args.job, args.lines))
else:
parser.print_help()
if __name__ == "__main__":
main()