#!/usr/bin/env python3 """Cortex Scheduler — manage periodic jobs via systemd (Linux) or launchd (macOS). Usage: cortex schedule list cortex schedule enable [--interval ] cortex schedule disable cortex schedule status cortex schedule logs [--lines 50] """ import argparse import json import os import platform import shutil import subprocess import sys import textwrap from pathlib import Path from typing import Optional from cortex.config import cortex_home # --- Job definitions --- JOBS = { "feedback": { "description": "Extract lessons from session transcripts", "command": "cortex feedback --since {interval}", "default_interval_min": 360, # 6 hours "calendar": "*-*-* 0/6:00:00", # systemd OnCalendar }, "hygiene": { "description": "Clean stale/duplicate/orphan memory files", "command": "cortex hygiene duplicates && cortex hygiene stale && cortex hygiene archive", "default_interval_min": 1440, # daily "calendar": "*-*-* 04:00:00", }, "health": { "description": "Proactive system health scan", "command": "cortex health --json", "default_interval_min": 30, "calendar": "*-*-* *:0/30:00", }, "learn": { "description": "Extract preferences from NATS events", "command": "cortex learn --since {interval}", "default_interval_min": 360, # 6 hours "calendar": "*-*-* 0/6:00:00", }, "context": { "description": "Generate learning context from events", "command": "cortex context --events 2000", "default_interval_min": 360, # 6 hours "calendar": "*-*-* 0/6:00:00", }, "tracker": { "description": "Scan for commitments and contradictions", "command": "cortex track scan --since {interval}", "default_interval_min": 1440, # daily "calendar": "*-*-* 06:00:00", }, "sentinel": { "description": "Security feed aggregation and CVE matching", "command": "cortex sentinel scan && cortex sentinel matches", "default_interval_min": 360, # 6 hours "calendar": "*-*-* 0/6:00:00", }, } def _is_linux() -> bool: return platform.system() == "Linux" def _is_macos() -> bool: return platform.system() == "Darwin" def _cortex_bin() -> str: """Find the cortex binary path.""" which = shutil.which("cortex") if which: return which # Fallback: python -m cortex.cli return f"{sys.executable} -m cortex.cli" def _env_vars() -> dict[str, str]: """Collect CORTEX_* env vars to pass to scheduled jobs.""" env = {} for key in ("CORTEX_HOME", "CORTEX_MEMORY_DIR", "CORTEX_CONFIG", "CORTEX_GROWTH_LOG", "CORTEX_ROADMAP"): val = os.environ.get(key) if val: env[key] = val return env # --- systemd (Linux) --- def _systemd_dir() -> Path: return Path.home() / ".config" / "systemd" / "user" def _systemd_unit_name(job: str) -> str: return f"cortex-{job}" def _write_systemd_units(job: str, interval_min: int) -> tuple[Path, Path]: """Write .service and .timer files for a job.""" job_def = JOBS[job] unit = _systemd_unit_name(job) sdir = _systemd_dir() sdir.mkdir(parents=True, exist_ok=True) cortex_bin = _cortex_bin() env_vars = _env_vars() env_lines = "\n".join(f"Environment={k}={v}" for k, v in env_vars.items()) # For feedback, convert interval to --since format command = job_def["command"] if "{interval}" in command: hours = max(1, interval_min // 60) command = command.replace("{interval}", f"{hours}h") service_path = sdir / f"{unit}.service" service_path.write_text(textwrap.dedent(f"""\ [Unit] Description=Cortex {job.title()} — {job_def['description']} [Service] Type=oneshot ExecStart=/bin/bash -c '{command}' {env_lines} Environment=PATH={os.environ.get('PATH', '/usr/local/bin:/usr/bin:/bin')} [Install] WantedBy=default.target """)) timer_path = sdir / f"{unit}.timer" timer_path.write_text(textwrap.dedent(f"""\ [Unit] Description=Cortex {job.title()} Timer — every {interval_min}min [Timer] OnBootSec=5min OnUnitActiveSec={interval_min}min Persistent=true [Install] WantedBy=timers.target """)) return service_path, timer_path def _enable_systemd(job: str, interval_min: int) -> bool: service_path, timer_path = _write_systemd_units(job, interval_min) unit = _systemd_unit_name(job) try: subprocess.run(["systemctl", "--user", "daemon-reload"], check=True, capture_output=True) subprocess.run(["systemctl", "--user", "enable", "--now", f"{unit}.timer"], check=True, capture_output=True) return True except subprocess.CalledProcessError as e: print(f"Error enabling systemd timer: {e.stderr.decode()}", file=sys.stderr) return False def _disable_systemd(job: str) -> bool: unit = _systemd_unit_name(job) try: subprocess.run(["systemctl", "--user", "disable", "--now", f"{unit}.timer"], check=True, capture_output=True) # Clean up unit files sdir = _systemd_dir() for ext in (".service", ".timer"): f = sdir / f"{unit}{ext}" if f.exists(): f.unlink() subprocess.run(["systemctl", "--user", "daemon-reload"], capture_output=True) return True except subprocess.CalledProcessError as e: print(f"Error disabling systemd timer: {e.stderr.decode()}", file=sys.stderr) return False def _status_systemd() -> list[dict]: results = [] for job in JOBS: unit = _systemd_unit_name(job) try: r = subprocess.run( ["systemctl", "--user", "is-active", f"{unit}.timer"], capture_output=True, text=True ) active = r.stdout.strip() == "active" except FileNotFoundError: active = False # Get next trigger time next_run = "" if active: try: r = subprocess.run( ["systemctl", "--user", "show", f"{unit}.timer", "--property=NextElapseUSecRealtime"], capture_output=True, text=True ) next_run = r.stdout.strip().split("=", 1)[-1] if "=" in r.stdout else "" except Exception: pass results.append({ "job": job, "description": JOBS[job]["description"], "active": active, "next_run": next_run, }) return results def _logs_systemd(job: str, lines: int = 50) -> str: unit = _systemd_unit_name(job) try: r = subprocess.run( ["journalctl", "--user", "-u", f"{unit}.service", f"--lines={lines}", "--no-pager"], capture_output=True, text=True ) return r.stdout except FileNotFoundError: return "journalctl not available" # --- launchd (macOS) --- def _launchd_dir() -> Path: return Path.home() / "Library" / "LaunchAgents" def _launchd_label(job: str) -> str: return f"dev.cortex.{job}" def _write_launchd_plist(job: str, interval_min: int) -> Path: """Write a launchd plist for a job.""" job_def = JOBS[job] label = _launchd_label(job) pdir = _launchd_dir() pdir.mkdir(parents=True, exist_ok=True) cortex_bin = _cortex_bin() env_vars = _env_vars() command = job_def["command"] if "{interval}" in command: hours = max(1, interval_min // 60) command = command.replace("{interval}", f"{hours}h") log_dir = cortex_home() / "logs" log_dir.mkdir(parents=True, exist_ok=True) env_xml = "" all_env = {**env_vars, "PATH": os.environ.get("PATH", "/usr/local/bin:/usr/bin:/bin")} if all_env: env_xml = " EnvironmentVariables\n \n" for k, v in all_env.items(): env_xml += f" {k}\n {v}\n" env_xml += " " plist_path = pdir / f"{label}.plist" plist_path.write_text(textwrap.dedent(f"""\ Label {label} ProgramArguments /bin/bash -c {command} StartInterval {interval_min * 60} StandardOutPath {log_dir / f'{job}.log'} StandardErrorPath {log_dir / f'{job}.err'} {env_xml} """)) return plist_path def _enable_launchd(job: str, interval_min: int) -> bool: plist_path = _write_launchd_plist(job, interval_min) label = _launchd_label(job) try: # Unload first if already loaded subprocess.run(["launchctl", "unload", str(plist_path)], capture_output=True) subprocess.run(["launchctl", "load", str(plist_path)], check=True, capture_output=True) return True except subprocess.CalledProcessError as e: print(f"Error loading launchd job: {e.stderr.decode()}", file=sys.stderr) return False def _disable_launchd(job: str) -> bool: label = _launchd_label(job) plist_path = _launchd_dir() / f"{label}.plist" try: subprocess.run(["launchctl", "unload", str(plist_path)], check=True, capture_output=True) if plist_path.exists(): plist_path.unlink() return True except subprocess.CalledProcessError as e: print(f"Error unloading launchd job: {e.stderr.decode()}", file=sys.stderr) return False def _status_launchd() -> list[dict]: results = [] for job in JOBS: label = _launchd_label(job) try: r = subprocess.run( ["launchctl", "list", label], capture_output=True, text=True ) active = r.returncode == 0 except FileNotFoundError: active = False results.append({ "job": job, "description": JOBS[job]["description"], "active": active, "next_run": "", }) return results def _logs_launchd(job: str, lines: int = 50) -> str: log_path = cortex_home() / "logs" / f"{job}.log" if not log_path.exists(): return f"No log file at {log_path}" all_lines = log_path.read_text().splitlines() return "\n".join(all_lines[-lines:]) # --- Unified interface --- def enable_job(job: str, interval_min: Optional[int] = None) -> bool: if job not in JOBS: print(f"Unknown job: {job}. Available: {', '.join(JOBS.keys())}") return False interval = interval_min or JOBS[job]["default_interval_min"] if _is_linux(): return _enable_systemd(job, interval) elif _is_macos(): return _enable_launchd(job, interval) else: print(f"Unsupported platform: {platform.system()}") return False def disable_job(job: str) -> bool: if job not in JOBS: print(f"Unknown job: {job}. Available: {', '.join(JOBS.keys())}") return False if _is_linux(): return _disable_systemd(job) elif _is_macos(): return _disable_launchd(job) else: print(f"Unsupported platform: {platform.system()}") return False def status() -> list[dict]: if _is_linux(): return _status_systemd() elif _is_macos(): return _status_launchd() return [] def logs(job: str, lines: int = 50) -> str: if _is_linux(): return _logs_systemd(job, lines) elif _is_macos(): return _logs_launchd(job, lines) return "Unsupported platform" # --- CLI --- def main(): parser = argparse.ArgumentParser(description="Cortex Scheduler") sub = parser.add_subparsers(dest="action") sub.add_parser("list", help="List available jobs") sub.add_parser("status", help="Show active job status") enable_p = sub.add_parser("enable", help="Enable a scheduled job") enable_p.add_argument("job", choices=JOBS.keys()) enable_p.add_argument("--interval", type=int, help="Interval in minutes") disable_p = sub.add_parser("disable", help="Disable a scheduled job") disable_p.add_argument("job", choices=JOBS.keys()) logs_p = sub.add_parser("logs", help="Show job logs") logs_p.add_argument("job", choices=JOBS.keys()) logs_p.add_argument("--lines", type=int, default=50) args = parser.parse_args() if args.action == "list": print(f"{'Job':<12} {'Interval':<12} {'Description'}") print("-" * 60) for name, jdef in JOBS.items(): interval = f"{jdef['default_interval_min']}min" print(f"{name:<12} {interval:<12} {jdef['description']}") elif args.action == "status": results = status() if not results: print(f"No scheduler support on {platform.system()}") return print(f"{'Job':<12} {'Status':<10} {'Next Run'}") print("-" * 60) for r in results: st = "✅ active" if r["active"] else "⬚ inactive" print(f"{r['job']:<12} {st:<10} {r['next_run']}") elif args.action == "enable": ok = enable_job(args.job, args.interval) if ok: interval = args.interval or JOBS[args.job]["default_interval_min"] print(f"✅ {args.job} enabled (every {interval}min)") else: print(f"❌ Failed to enable {args.job}") sys.exit(1) elif args.action == "disable": ok = disable_job(args.job) if ok: print(f"⬚ {args.job} disabled") else: print(f"❌ Failed to disable {args.job}") sys.exit(1) elif args.action == "logs": print(logs(args.job, args.lines)) else: parser.print_help() if __name__ == "__main__": main()